Skip to content

Commit

Permalink
Issue #4331 Close Complete
Browse files Browse the repository at this point in the history
Working async close complete!

Signed-off-by: Greg Wilkins <[email protected]>
  • Loading branch information
gregw committed Dec 5, 2019
1 parent 186164c commit 88398c8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,6 @@ public void close(Callback callback)
callback.succeeded();
return;

case CLOSING:
// Close already initiated, so just add the callback to those
// executed when it is complete.
_closeCallback = Callback.combine(_closeCallback, callback);
return;

case ERROR:
// TODO is this right?
Callback cb = Callback.combine(_closeCallback, callback);
Expand All @@ -311,16 +305,16 @@ public void close(Callback callback)
_state = State.CLOSED;
return;

case PENDING:
case UNREADY:
case CLOSING: // Close already initiated, so just add callback
case PENDING: // Add the callback and close when write is complete.
case UNREADY: // Add the callback and close when write is complete.
// Let's just add the callback so it get's noticed once write is possible.
_closeCallback = Callback.combine(_closeCallback, callback);
break;
return;

default:
_state = State.CLOSING;
_closeCallback = Callback.combine(_closeCallback, callback);
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -170,19 +171,19 @@ public void onCompleted()
public static Stream<Arguments> tests()
{
List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), 200, __data});
tests.add(new Object[]{new HelloWorldHandler(), false, 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), false, 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), true, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), false, 200, __data});
tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), true, 200, __data});
return tests.stream().map(Arguments::of);
}

@ParameterizedTest
@MethodSource("tests")
public void testAsyncCompletion(Handler handler, int status, String message) throws Exception
public void testAsyncCompletion(Handler handler, boolean blocked, int status, String message) throws Exception
{
configureServer(handler);

Expand Down Expand Up @@ -210,7 +211,7 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr

// wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while (_threadPool.getBusyThreads() != base)
while (_threadPool.getBusyThreads() != base + (blocked ? 1 : 0))
{
if (System.nanoTime() > end)
throw new TimeoutException();
Expand All @@ -220,6 +221,14 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr
// We are now asynchronously waiting!
assertThat(__complete.get(), is(false));

// Do we need to wait for an unready state?
if (handler instanceof AsyncWriteCompleteHandler)
{
AsyncWriteCompleteHandler awch = (AsyncWriteCompleteHandler)handler;
if (awch._unReady)
assertThat(awch._unReadySeen.await(5, TimeUnit.SECONDS),is(true));
}

// proceed with the completion
delay.proceed();

Expand Down Expand Up @@ -276,6 +285,8 @@ private static class AsyncWriteCompleteHandler extends AbstractHandler
{
final boolean _unReady;
final boolean _close;
final CountDownLatch _unReadySeen = new CountDownLatch(1);
boolean _written;

AsyncWriteCompleteHandler(boolean unReady, boolean close)
{
Expand All @@ -295,13 +306,23 @@ public void onWritePossible() throws IOException
{
if (out.isReady())
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
if (_unReady)
assertThat(out.isReady(),Matchers.is(false));
if (!_written)
{
_written = true;
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
}
if (_unReady && _unReadySeen.getCount() == 1)
{
assertThat(out.isReady(), Matchers.is(false));
_unReadySeen.countDown();
return;
}
if (_close)
{
out.close();
}
context.complete();
}
}
Expand All @@ -313,5 +334,11 @@ public void onError(Throwable t)
}
});
}

@Override
public String toString()
{
return String.format("%s@%x{ur=%b,c=%b}", this.getClass().getSimpleName(), hashCode(), _unReady, _close);
}
}
}

0 comments on commit 88398c8

Please sign in to comment.