Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with a fully async ContentSourceCompletableFuture #9975

Merged
merged 20 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4a35731
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 26, 2023
86a08c5
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 27, 2023
82001de
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 27, 2023
d046fd8
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 27, 2023
ea2e7c9
Merge branch 'jetty-12.0.x' into experiment/jetty-12-ContentSourceCom…
gregw Jun 28, 2023
564b594
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 28, 2023
0067bf1
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 28, 2023
3727473
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
bc7bd39
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
17d8bb1
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
39cfdbd
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
1d71509
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
7b615e0
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
5eaf3d7
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
78be33d
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 29, 2023
0a739fa
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 30, 2023
91209b4
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 30, 2023
1b47039
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 30, 2023
dff0202
Experiment with a fully async ContentSourceCompletableFuture
gregw Jun 30, 2023
e45daca
Improved javadocs of ContentSourceCompletableFuture.
sbordet Jun 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@

package org.eclipse.jetty.docs.programming;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.AsyncContent;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CharsetStringBuilder;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,8 +153,94 @@ public static void testEcho() throws Exception
throw new IllegalStateException("EOF expected");
}

public static class FutureString extends CompletableFuture<String>
{
private final CharsetStringBuilder text;
private final Content.Source source;

public FutureString(Content.Source source, Charset charset)
{
this.source = source;
this.text = CharsetStringBuilder.forCharset(charset);
source.demand(this::onContentAvailable);
}

private void onContentAvailable()
{
while (true)
{
Content.Chunk chunk = source.read();
if (chunk == null)
{
source.demand(this::onContentAvailable);
return;
}

try
{
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();

if (chunk.hasRemaining())
text.append(chunk.getByteBuffer());

if (chunk.isLast() && complete(text.build()))
return;
}
catch (Throwable e)
{
completeExceptionally(e);
}
finally
{
chunk.release();
}
}
}
}

public static void testFutureString() throws Exception
{
AsyncContent source = new AsyncContent();
FutureString future = new FutureString(source, StandardCharsets.UTF_8);
if (future.isDone())
throw new IllegalStateException();

Callback.Completable writeCallback = new Callback.Completable();
Content.Sink.write(source, false, "One", writeCallback);
if (!writeCallback.isDone() || future.isDone())
throw new IllegalStateException("Should be consumed");
Content.Sink.write(source, false, "Two", writeCallback);
if (!writeCallback.isDone() || future.isDone())
throw new IllegalStateException("Should be consumed");
Content.Sink.write(source, true, "Three", writeCallback);
if (!writeCallback.isDone() || !future.isDone())
throw new IllegalStateException("Should be consumed");

System.err.println(future.get());
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

public static class FutureUtf8String extends ContentSourceCompletableFuture<String>
{
Utf8StringBuilder builder = new Utf8StringBuilder();
gregw marked this conversation as resolved.
Show resolved Hide resolved

public FutureUtf8String(Content.Source content)
{
super(content);
}

@Override
protected String parse(Content.Chunk chunk) throws Throwable
{
if (chunk.hasRemaining())
builder.append(chunk.getByteBuffer());
return chunk.isLast() ? builder.takeCompleteString(IllegalStateException::new) : null;
}
}

public static void main(String... args) throws Exception
{
testEcho();
testFutureString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,12 @@ public boolean handle(Request request, Response response, Callback callback) thr
String contentType = request.getHeaders().get(HttpHeader.CONTENT_TYPE);
assertEquals("multipart/form-data", HttpField.valueParameters(contentType, null));
String boundary = MultiPart.extractBoundary(contentType);
MultiPartFormData formData = new MultiPartFormData(boundary);
MultiPartFormData.Parser formData = new MultiPartFormData.Parser(boundary);
formData.setFilesDirectory(tmpDir);
formData.parse(request);

try
{
process(formData.join()); // May block waiting for multipart form data.
process(formData.parse(request).join()); // May block waiting for multipart form data.
response.write(true, BufferUtil.EMPTY_BUFFER, callback);
}
catch (Exception x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.concurrent.CompletableFuture;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.thread.AutoLock;

/**
* <p>A {@link CompletableFuture} that is completed when a multipart/byteranges
* content has been parsed asynchronously from a {@link Content.Source} via
* {@link #parse(Content.Source)}.</p>
* has been parsed asynchronously from a {@link Content.Source}.</p>
* <p>Once the parsing of the multipart/byteranges content completes successfully,
* objects of this class are completed with a {@link MultiPartByteRanges.Parts}
* object.</p>
Expand All @@ -54,73 +54,8 @@
*/
public class MultiPartByteRanges extends CompletableFuture<MultiPartByteRanges.Parts>
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
private final PartsListener listener = new PartsListener();
private final MultiPart.Parser parser;

public MultiPartByteRanges(String boundary)
{
this.parser = new MultiPart.Parser(boundary, listener);
}

/**
* @return the boundary string
*/
public String getBoundary()
{
return parser.getBoundary();
}

@Override
public boolean completeExceptionally(Throwable failure)
{
listener.fail(failure);
return super.completeExceptionally(failure);
}

/**
* <p>Parses the given multipart/byteranges content.</p>
* <p>Returns this {@code MultiPartByteRanges} object,
* so that it can be used in the typical "fluent" style
* of {@link CompletableFuture}.</p>
*
* @param content the multipart/byteranges content to parse
* @return this {@code MultiPartByteRanges} object
*/
public MultiPartByteRanges parse(Content.Source content)
{
new Runnable()
{
@Override
public void run()
{
while (true)
{
Content.Chunk chunk = content.read();
if (chunk == null)
{
content.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
listener.onFailure(chunk.getFailure());
return;
}
parse(chunk);
chunk.release();
if (chunk.isLast() || isDone())
return;
}
}
}.run();
return this;
}

private void parse(Content.Chunk chunk)
private MultiPartByteRanges()
{
if (listener.isFailed())
return;
parser.parse(chunk);
}

/**
Expand Down Expand Up @@ -267,76 +202,123 @@ public Content.Source newContentSource()
}
}

private class PartsListener extends MultiPart.AbstractPartsListener
public static class Parser
{
private final AutoLock lock = new AutoLock();
private final List<Content.Chunk> partChunks = new ArrayList<>();
private final List<MultiPart.Part> parts = new ArrayList<>();
private Throwable failure;
private final PartsListener listener = new PartsListener();
private final MultiPart.Parser parser;
private Parts parts;

private boolean isFailed()
public Parser(String boundary)
{
try (AutoLock ignored = lock.lock())
{
return failure != null;
}
parser = new MultiPart.Parser(boundary, listener);
}

@Override
public void onPartContent(Content.Chunk chunk)
public CompletableFuture<MultiPartByteRanges.Parts> parse(Content.Source content)
{
try (AutoLock ignored = lock.lock())
ContentSourceCompletableFuture<MultiPartByteRanges.Parts> futureParts = new ContentSourceCompletableFuture<>(content)
{
// Retain the chunk because it is stored for later use.
chunk.retain();
partChunks.add(chunk);
}
@Override
protected MultiPartByteRanges.Parts parse(Content.Chunk chunk) throws Throwable
{
if (listener.isFailed())
throw listener.failure;
parser.parse(chunk);
if (listener.isFailed())
throw listener.failure;
return parts;
}

@Override
public boolean completeExceptionally(Throwable failure)
{
boolean failed = super.completeExceptionally(failure);
if (failed)
listener.fail(failure);
return failed;
}
};
futureParts.parse();
return futureParts;
}

@Override
public void onPart(String name, String fileName, HttpFields headers)
/**
* @return the boundary string
*/
public String getBoundary()
{
try (AutoLock ignored = lock.lock())
{
parts.add(new MultiPart.ChunksPart(name, fileName, headers, List.copyOf(partChunks)));
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
return parser.getBoundary();
}

@Override
public void onComplete()
private class PartsListener extends MultiPart.AbstractPartsListener
{
super.onComplete();
List<MultiPart.Part> copy;
try (AutoLock ignored = lock.lock())
private final AutoLock lock = new AutoLock();
private final List<Content.Chunk> partChunks = new ArrayList<>();
private final List<MultiPart.Part> parts = new ArrayList<>();
private Throwable failure;

private boolean isFailed()
{
copy = List.copyOf(parts);
try (AutoLock ignored = lock.lock())
{
return failure != null;
}
}
complete(new Parts(getBoundary(), copy));
}

@Override
public void onFailure(Throwable failure)
{
super.onFailure(failure);
completeExceptionally(failure);
}
@Override
public void onPartContent(Content.Chunk chunk)
{
try (AutoLock ignored = lock.lock())
{
// Retain the chunk because it is stored for later use.
chunk.retain();
partChunks.add(chunk);
}
}

private void fail(Throwable cause)
{
List<MultiPart.Part> partsToFail;
try (AutoLock ignored = lock.lock())
@Override
public void onPart(String name, String fileName, HttpFields headers)
{
try (AutoLock ignored = lock.lock())
{
parts.add(new MultiPart.ChunksPart(name, fileName, headers, List.copyOf(partChunks)));
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
}

@Override
public void onComplete()
{
if (failure != null)
return;
failure = cause;
partsToFail = List.copyOf(parts);
parts.clear();
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
super.onComplete();
List<MultiPart.Part> copy;
try (AutoLock ignored = lock.lock())
{
copy = List.copyOf(parts);
Parser.this.parts = new Parts(getBoundary(), copy);
}
}

@Override
public void onFailure(Throwable failure)
{
fail(failure);
}

private void fail(Throwable cause)
{
List<MultiPart.Part> partsToFail;
try (AutoLock ignored = lock.lock())
{
if (failure != null)
return;
failure = cause;
partsToFail = List.copyOf(parts);
parts.clear();
partChunks.forEach(Content.Chunk::release);
partChunks.clear();
}
partsToFail.forEach(p -> p.fail(cause));
}
partsToFail.forEach(p -> p.fail(cause));
}
}
}
Loading