Skip to content

Commit

Permalink
Fully async Multipart Form handling (#9975)
Browse files Browse the repository at this point in the history
A fully async ContentSourceCompletableFuture for use by MultiPartFormData and MultiPartByteRanges
Restructure MultiPartFormData to have a Parser class
---------

Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: Simone Bordet <[email protected]>
  • Loading branch information
gregw and sbordet authored Jun 30, 2023
1 parent dd44b30 commit ec2dbe7
Show file tree
Hide file tree
Showing 22 changed files with 1,494 additions and 1,206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@

package org.eclipse.jetty.docs.programming;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CharsetStringBuilder;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,8 +153,92 @@ public static void testEcho() throws Exception
throw new IllegalStateException("EOF expected");
}

public static class FutureString extends CompletableFuture<String>
{
private final CharsetStringBuilder text;
private final Content.Source source;

public FutureString(Content.Source source, Charset charset)
{
this.source = source;
this.text = CharsetStringBuilder.forCharset(charset);
source.demand(this::onContentAvailable);
}

private void onContentAvailable()
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk == null)
{
source.demand(this::onContentAvailable);
return;
}

try
{
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();

if (chunk.hasRemaining())
text.append(chunk.getByteBuffer());

if (chunk.isLast() && complete(text.build()))
return;
}
catch (Throwable e)
{
completeExceptionally(e);
}
finally
{
chunk.release();
}
}
}
}

public static void testFutureString() throws Exception
{
AsyncContent source = new AsyncContent();
FutureString future = new FutureString(source, StandardCharsets.UTF_8);
if (future.isDone())
throw new IllegalStateException();

Callback.Completable writeCallback = new Callback.Completable();
Content.Sink.write(source, false, "One", writeCallback);
if (!writeCallback.isDone() || future.isDone())
throw new IllegalStateException("Should be consumed");
Content.Sink.write(source, false, "Two", writeCallback);
if (!writeCallback.isDone() || future.isDone())
throw new IllegalStateException("Should be consumed");
Content.Sink.write(source, true, "Three", writeCallback);
if (!writeCallback.isDone() || !future.isDone())
throw new IllegalStateException("Should be consumed");
}

public static class FutureUtf8String extends ContentSourceCompletableFuture<String>
{
private final Utf8StringBuilder builder = new Utf8StringBuilder();

public FutureUtf8String(Content.Source content)
{
super(content);
}

@Override
protected String parse(Content.Chunk chunk) throws Throwable
{
if (chunk.hasRemaining())
builder.append(chunk.getByteBuffer());
return chunk.isLast() ? builder.takeCompleteString(IllegalStateException::new) : null;
}
}

public static void main(String... args) throws Exception
{
testEcho();
testFutureString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,12 @@ public boolean handle(Request request, Response response, Callback callback) thr
String contentType = request.getHeaders().get(HttpHeader.CONTENT_TYPE);
assertEquals("multipart/form-data", HttpField.valueParameters(contentType, null));
String boundary = MultiPart.extractBoundary(contentType);
MultiPartFormData formData = new MultiPartFormData(boundary);
MultiPartFormData.Parser formData = new MultiPartFormData.Parser(boundary);
formData.setFilesDirectory(tmpDir);
formData.parse(request);

try
{
process(formData.join()); // May block waiting for multipart form data.
process(formData.parse(request).join()); // May block waiting for multipart form data.
response.write(true, BufferUtil.EMPTY_BUFFER, callback);
}
catch (Exception x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.concurrent.CompletableFuture;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.thread.AutoLock;

/**
* <p>A {@link CompletableFuture} that is completed when a multipart/byteranges
* content has been parsed asynchronously from a {@link Content.Source} via
* {@link #parse(Content.Source)}.</p>
* has been parsed asynchronously from a {@link Content.Source}.</p>
* <p>Once the parsing of the multipart/byteranges content completes successfully,
* objects of this class are completed with a {@link MultiPartByteRanges.Parts}
* object.</p>
Expand All @@ -52,75 +52,10 @@
*
* @see Parts
*/
public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.Parts>
public class MultiPartByteRanges
{
private final PartsListener listener = new PartsListener();
private final MultiPart.Parser parser;

public MultiPartByteRanges(String boundary)
{
this.parser = new MultiPart.Parser(boundary, listener);
}

/**
* @return the boundary string
*/
public String getBoundary()
{
return parser.getBoundary();
}

@Override
public boolean completeExceptionally(Throwable failure)
{
listener.fail(failure);
return super.completeExceptionally(failure);
}

/**
* <p>Parses the given multipart/byteranges content.</p>
* <p>Returns this {@code MultiPartByteRanges} object,
* so that it can be used in the typical "fluent" style
* of {@link CompletableFuture}.</p>
*
* @param content the multipart/byteranges content to parse
* @return this {@code MultiPartByteRanges} object
*/
public MultiPartByteRanges parse(Content.Source content)
{
new Runnable()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = content.read();
if (chunk == null)
{
content.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
listener.onFailure(chunk.getFailure());
return;
}
parse(chunk);
chunk.release();
if (chunk.isLast() || isDone())
return;
}
}
}.run();
return this;
}

private void parse(Content.Chunk chunk)
private MultiPartByteRanges()
{
if (listener.isFailed())
return;
parser.parse(chunk);
}

/**
Expand Down Expand Up @@ -267,76 +202,123 @@ public Content.Source newContentSource()
}
}

private class PartsListener extends MultiPart.AbstractPartsListener
public static class Parser
{
private final AutoLock lock = new AutoLock();
private final List<Content.Chunk> partChunks = new ArrayList<>();
private final List<MultiPart.Part> parts = new ArrayList<>();
private Throwable failure;
private final PartsListener listener = new PartsListener();
private final MultiPart.Parser parser;
private Parts parts;

private boolean isFailed()
public Parser(String boundary)
{
try (AutoLock ignored = lock.lock())
{
return failure != null;
}
parser = new MultiPart.Parser(boundary, listener);
}

@Override
public void onPartContent(Content.Chunk chunk)
public CompletableFuture<MultiPartByteRanges.Parts> parse(Content.Source content)
{
try (AutoLock ignored = lock.lock())
ContentSourceCompletableFuture<MultiPartByteRanges.Parts> futureParts = new ContentSourceCompletableFuture<>(content)
{
// Retain the chunk because it is stored for later use.
chunk.retain();
partChunks.add(chunk);
}
@Override
protected MultiPartByteRanges.Parts parse(Content.Chunk chunk) throws Throwable
{
if (listener.isFailed())
throw listener.failure;
parser.parse(chunk);
if (listener.isFailed())
throw listener.failure;
return parts;
}

@Override
public boolean completeExceptionally(Throwable failure)
{
boolean failed = super.completeExceptionally(failure);
if (failed)
listener.fail(failure);
return failed;
}
};
futureParts.parse();
return futureParts;
}

@Override
public void onPart(String name, String fileName, HttpFields headers)
/**
* @return the boundary string
*/
public String getBoundary()
{
try (AutoLock ignored = lock.lock())
{
parts.add(new MultiPart.ChunksPart(name, fileName, headers, List.copyOf(partChunks)));
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
return parser.getBoundary();
}

@Override
public void onComplete()
private class PartsListener extends MultiPart.AbstractPartsListener
{
super.onComplete();
List<MultiPart.Part> copy;
try (AutoLock ignored = lock.lock())
private final AutoLock lock = new AutoLock();
private final List<Content.Chunk> partChunks = new ArrayList<>();
private final List<MultiPart.Part> parts = new ArrayList<>();
private Throwable failure;

private boolean isFailed()
{
copy = List.copyOf(parts);
try (AutoLock ignored = lock.lock())
{
return failure != null;
}
}
complete(new Parts(getBoundary(), copy));
}

@Override
public void onFailure(Throwable failure)
{
super.onFailure(failure);
completeExceptionally(failure);
}
@Override
public void onPartContent(Content.Chunk chunk)
{
try (AutoLock ignored = lock.lock())
{
// Retain the chunk because it is stored for later use.
chunk.retain();
partChunks.add(chunk);
}
}

private void fail(Throwable cause)
{
List<MultiPart.Part> partsToFail;
try (AutoLock ignored = lock.lock())
@Override
public void onPart(String name, String fileName, HttpFields headers)
{
try (AutoLock ignored = lock.lock())
{
parts.add(new MultiPart.ChunksPart(name, fileName, headers, List.copyOf(partChunks)));
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
}

@Override
public void onComplete()
{
if (failure != null)
return;
failure = cause;
partsToFail = List.copyOf(parts);
parts.clear();
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
super.onComplete();
List<MultiPart.Part> copy;
try (AutoLock ignored = lock.lock())
{
copy = List.copyOf(parts);
Parser.this.parts = new Parts(getBoundary(), copy);
}
}

@Override
public void onFailure(Throwable failure)
{
fail(failure);
}

private void fail(Throwable cause)
{
List<MultiPart.Part> partsToFail;
try (AutoLock ignored = lock.lock())
{
if (failure != null)
return;
failure = cause;
partsToFail = List.copyOf(parts);
parts.clear();
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
partsToFail.forEach(p -> p.fail(cause));
}
partsToFail.forEach(p -> p.fail(cause));
}
}
}
Loading

0 comments on commit ec2dbe7

Please sign in to comment.