Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced [Callback|Promise]Completable.with(Consumer) to simplify u… #8620

Merged
merged 1 commit into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,8 @@ private void assertParts(Map<String, List<MultiPart.Part>> allParts) throws Exce
List<MultiPart.Part> charSetParts = allParts.get("_charset_");
if (charSetParts != null)
{
Promise.Completable<String> promise = new Promise.Completable<>();
Content.Source.asString(charSetParts.get(0).getContent(), StandardCharsets.US_ASCII, promise);
defaultCharset = promise.get();
defaultCharset = Promise.Completable.<String>with(p -> Content.Source.asString(charSetParts.get(0).getContent(), StandardCharsets.US_ASCII, p))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad we can't have Content.Source.asString(Source, Charset) that returns a CompletableFuture.

.get();
}

for (NameValue expected : partContainsContents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,7 @@ public void connect(SocketAddress address, Session.Listener listener, Promise<Se

public CompletableFuture<Session> connect(SslContextFactory sslContextFactory, SocketAddress address, Session.Listener listener)
{
Promise.Completable<Session> result = new Promise.Completable<>();
connect(sslContextFactory, address, listener, result);
return result;
return Promise.Completable.with(p -> connect(sslContextFactory, address, listener, p));
}

public void connect(SslContextFactory sslContextFactory, SocketAddress address, Session.Listener listener, Promise<Session> promise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ public interface Session
*/
public default CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
Promise.Completable<Stream> result = new Promise.Completable<>();
newStream(frame, result, listener);
return result;
return Promise.Completable.with(p -> newStream(frame, p, listener));
}

/**
Expand All @@ -87,6 +85,17 @@ public default CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Li
*/
public int priority(PriorityFrame frame, Callback callback);

/**
* <p>Sends the given SETTINGS {@code frame} to configure the session.</p>
*
* @param frame the SETTINGS frame to send
* @return a CompletableFuture that is notified when the frame has been sent
*/
public default CompletableFuture<Void> settings(SettingsFrame frame)
{
return Callback.Completable.with(c -> settings(frame, c));
}

/**
* <p>Sends the given SETTINGS {@code frame} to configure the session.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public interface Stream
*/
public default CompletableFuture<Stream> headers(HeadersFrame frame)
{
Promise.Completable<Stream> result = new Promise.Completable<>();
headers(frame, Callback.from(() -> result.succeeded(this), result::failed));
return result;
return Promise.Completable.with(p -> headers(frame, Callback.from(() -> p.succeeded(this), p::failed)));
}

/**
Expand All @@ -85,9 +83,7 @@ public default CompletableFuture<Stream> headers(HeadersFrame frame)
*/
public default CompletableFuture<Stream> push(PushPromiseFrame frame, Listener listener)
{
Promise.Completable<Stream> result = new Promise.Completable<>();
push(frame, result, listener);
return result;
return Promise.Completable.with(p -> push(frame, p, listener));
}

/**
Expand Down Expand Up @@ -139,9 +135,7 @@ public default CompletableFuture<Stream> push(PushPromiseFrame frame, Listener l
*/
public default CompletableFuture<Stream> data(DataFrame frame)
{
Promise.Completable<Stream> result = new Promise.Completable<>();
data(frame, Callback.from(() -> result.succeeded(this), result::failed));
return result;
return Promise.Completable.with(p -> data(frame, Callback.from(() -> p.succeeded(this), p::failed)));
}

/**
Expand All @@ -155,7 +149,18 @@ public default CompletableFuture<Stream> data(DataFrame frame)
/**
* <p>Sends the given RST_STREAM {@code frame}.</p>
*
* @param frame the RST_FRAME to send
* @param frame the RST_STREAM frame to send
* @return the CompletableFuture that gets notified when the frame has been sent
*/
public default CompletableFuture<Void> reset(ResetFrame frame)
{
return Callback.Completable.with(c -> reset(frame, c));
}

/**
* <p>Sends the given RST_STREAM {@code frame}.</p>
*
* @param frame the RST_STREAM frame to send
* @param callback the callback that gets notified when the frame has been sent
*/
public void reset(ResetFrame frame, Callback callback);
Expand Down Expand Up @@ -348,7 +353,7 @@ public default void onDataAvailable(Stream stream)
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
*
* @param stream the stream
* @param frame the RST_FRAME received
* @param frame the RST_STREAM frame received
* @param callback the callback to complete when the reset has been handled
*/
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,13 @@ public void onSettings(Session session, SettingsFrame frame)
Stream stream = promise.get(5, TimeUnit.SECONDS);

// Send first chunk that exceeds the window.
Callback.Completable completable = new Callback.Completable();
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), completable);
CompletableFuture<Stream> completable = stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false));
settingsLatch.await(5, TimeUnit.SECONDS);

completable.thenRun(() ->
completable.thenAccept(s ->
{
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.NOOP);
s.data(new DataFrame(s.getId(), ByteBuffer.allocate(size * 2), true));
});

assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
Expand Down Expand Up @@ -341,9 +340,8 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)

Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, windowSize);
Callback.Completable completable = new Callback.Completable();
session.settings(new SettingsFrame(settings, false), completable);
completable.thenRun(settingsLatch::countDown);
session.settings(new SettingsFrame(settings, false))
.thenRun(settingsLatch::countDown);

assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));

Expand Down Expand Up @@ -641,13 +639,8 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
completable.thenRun(() ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true);
stream.data(dataFrame, Callback.NOOP);
});
stream.headers(responseFrame)
.thenAccept(s -> s.data(new DataFrame(s.getId(), ByteBuffer.wrap(data), true)));
return null;
}
});
Expand Down Expand Up @@ -691,21 +684,21 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
CompletableFuture<Stream> completable = stream.headers(responseFrame);
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
completable.thenRun(() -> stream.data(data.frame(), Callback.from(() ->
{
data.release();
if (!data.frame().isEndStream())
stream.demand();
})));
completable.thenAccept(s -> s.data(data.frame())
.whenComplete((r, x) ->
{
data.release();
if (!data.frame().isEndStream())
stream.demand();
}));
}
};
}
Expand All @@ -730,9 +723,8 @@ public Map<Integer, Integer> onPreface(Session session)
ByteBuffer responseContent = ByteBuffer.wrap(responseData);
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
Promise.Completable<Stream> completable = new Promise.Completable<>();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, completable, new Stream.Listener()
session.newStream(requestFrame, new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
Expand All @@ -745,12 +737,11 @@ public void onDataAvailable(Stream stream)
else
stream.demand();
}
});
completable.thenAccept(stream ->
})
.thenAccept(s ->
{
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
stream.data(dataFrame, Callback.NOOP);
s.data(new DataFrame(s.getId(), requestContent, true));
});

assertTrue(latch.await(5, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ public void process(Request request, Response response, Callback callback)
CountDownLatch latch = new CountDownLatch(1);
MetaData.Request metaData = newRequest("POST", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, false);
Promise.Completable<Stream> streamCompletable = new Promise.Completable<>();
session.newStream(frame, streamCompletable, new Stream.Listener()
session.newStream(frame, new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
Expand All @@ -235,18 +234,9 @@ public void onDataAvailable(Stream stream)
else
stream.demand();
}
});
streamCompletable.thenCompose(stream ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(512), false);
Callback.Completable dataCompletable = new Callback.Completable();
stream.data(dataFrame, dataCompletable);
return dataCompletable.thenApply(y -> stream);
}).thenAccept(stream ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true);
stream.data(dataFrame, Callback.NOOP);
});
})
.thenCompose(s -> s.data(new DataFrame(s.getId(), ByteBuffer.allocate(512), false)))
.thenAccept(s -> s.data(new DataFrame(s.getId(), ByteBuffer.allocate(1024), true)));

assertTrue(latch.await(5, TimeUnit.SECONDS));
}
Expand Down Expand Up @@ -546,9 +536,8 @@ public void testInvalidAPIUsageOnClient() throws Exception
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
Callback.Completable completable = new Callback.Completable();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, false), completable);
CompletableFuture<Stream> completable = stream.headers(new HeadersFrame(stream.getId(), response, null, false));
stream.demand();
return new Stream.Listener()
{
Expand All @@ -559,11 +548,8 @@ public void onDataAvailable(Stream stream)
data.release();
if (data.frame().isEndStream())
{
completable.thenRun(() ->
{
DataFrame endFrame = new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true);
stream.data(endFrame, Callback.NOOP);
});
completable.thenAccept(s ->
s.data(new DataFrame(s.getId(), BufferUtil.EMPTY_BUFFER, true)));
}
else
{
Expand All @@ -578,9 +564,8 @@ public void onDataAvailable(Stream stream)

MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData, null, false);
Promise.Completable<Stream> completable = new Promise.Completable<>();
CountDownLatch completeLatch = new CountDownLatch(2);
session.newStream(frame, completable, new Stream.Listener()
Stream stream = session.newStream(frame, new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
Expand All @@ -592,8 +577,7 @@ public void onDataAvailable(Stream stream)
else
stream.demand();
}
});
Stream stream = completable.get(5, TimeUnit.SECONDS);
}).get(5, TimeUnit.SECONDS);

long sleep = 1000;
DataFrame data1 = new DataFrame(stream.getId(), ByteBuffer.allocate(1024), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,14 @@ public void testDelayDemandAfterHeaders() throws Exception
@Override
public void process(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
Callback.Completable completable = new Callback.Completable();
response.write(false, ByteBuffer.allocate(1), completable);
completable.whenComplete((r, x) ->
{
if (x != null)
callback.failed(x);
else
response.write(true, ByteBuffer.allocate(2), callback);
});
Callback.Completable.with(c -> response.write(false, ByteBuffer.allocate(1), c))
.whenComplete((r, x) ->
{
if (x != null)
callback.failed(x);
else
response.write(true, ByteBuffer.allocate(2), callback);
});
}
});

Expand Down Expand Up @@ -638,12 +637,10 @@ public void test204WithContent() throws Exception
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
int streamId = stream.getId();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.NO_CONTENT_204, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
Callback.Completable callback = new Callback.Completable();
stream.headers(responseFrame, callback);
callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(bytes), true), Callback.NOOP));
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
stream.headers(responseFrame)
.thenAccept(s -> s.data(new DataFrame(s.getId(), ByteBuffer.wrap(bytes), true)));
return null;
}
});
Expand Down Expand Up @@ -671,12 +668,10 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
HttpFields fields = HttpFields.build()
.put(":method", "get");
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, fields, 0);
int streamId = stream.getId();
HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
Callback.Completable callback = new Callback.Completable();
stream.headers(responseFrame, callback);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
byte[] bytes = "hello".getBytes(StandardCharsets.US_ASCII);
callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(bytes), true), Callback.NOOP));
stream.headers(responseFrame)
.thenAccept(s -> s.data(new DataFrame(s.getId(), ByteBuffer.wrap(bytes), true)));
return null;
}
});
Expand Down Expand Up @@ -705,9 +700,8 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
int streamId = stream.getId();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(streamId, response, null, false);
Callback.Completable callback = new Callback.Completable();
stream.headers(responseFrame, callback);
callback.thenRun(() -> stream.data(new DataFrame(streamId, ByteBuffer.wrap(new byte[bytes]), true), Callback.NOOP));
stream.headers(responseFrame)
.thenAccept(s -> s.data(new DataFrame(s.getId(), ByteBuffer.wrap(new byte[bytes]), true)));
return null;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,20 +559,17 @@ public void succeeded(Stream stream)
session.newStream(requestFrame, promise, null);
Stream stream = promise.get(5, TimeUnit.SECONDS);

Callback.Completable completable1 = new Callback.Completable();
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), completable1);
completable1.thenCompose(nil ->
{
Callback.Completable completable2 = new Callback.Completable();
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), completable2);
return completable2;
}).thenRun(() ->
{
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
});
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false))
.thenCompose(s ->
{
sleep(idleTimeout / 2);
return s.data(new DataFrame(s.getId(), ByteBuffer.allocate(1), false));
}).thenAccept(s ->
{
sleep(idleTimeout / 2);
s.data(new DataFrame(s.getId(), ByteBuffer.allocate(1), true));
});

assertFalse(resetLatch.await(1, TimeUnit.SECONDS));
}
Expand Down
Loading