From 41daa3fe32ad2539a6b162504144e9f6e446e0a5 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 3 Dec 2019 14:31:09 +1100 Subject: [PATCH] Issue #4331 Async Close Complete Cleanups of write Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/server/HttpOutput.java | 122 +++++++++--------- .../eclipse/jetty/server/ResponseWriter.java | 2 +- 2 files changed, 59 insertions(+), 65 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 5d37ae924a2b..6d9f2179e9fc 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -228,27 +228,25 @@ protected Blocker acquireWriteBlockingCallback() throws IOException return _writeBlocker.acquire(); } - private void write(ByteBuffer content, boolean complete) throws IOException + private void channelWrite(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeBlocker.acquire()) { - write(content, complete, blocker); + channelWrite(content, complete, blocker); blocker.block(); if (complete) closed(); } - catch (Exception failure) + catch (Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug(failure); abort(failure); - if (failure instanceof IOException) - throw failure; - throw new IOException(failure); + throw failure; } } - protected void write(ByteBuffer content, boolean complete, Callback callback) + protected void channelWrite(ByteBuffer content, boolean complete, Callback callback) { if (_firstByteTimeStamp == -1) { @@ -373,14 +371,14 @@ public void close() if (closeCallback == BLOCKING_CLOSE_CALLBACK) { // Do a blocking close - write(content, !_channel.getResponse().isIncluding()); + channelWrite(content, !_channel.getResponse().isIncluding()); _closeCallback = null; closeCallback.succeeded(); } else { _closeCallback = null; - write(content, !_channel.getResponse().isIncluding(), closeCallback); + channelWrite(content, !_channel.getResponse().isIncluding(), closeCallback); } } catch (IOException x) @@ -489,7 +487,7 @@ public void flush() throws IOException switch (state) { case OPEN: - write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); + channelWrite(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); return; case ASYNC: @@ -521,6 +519,15 @@ public void flush() throws IOException @Override public void write(byte[] b, int off, int len) throws IOException { + long written = _written + len; + boolean last = _channel.getResponse().isAllContentWritten(written); + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + // Write will be aggregated if: + // + it is smaller than the commitSize + // + is not the last one, or is last but will fit in an already allocated aggregate buffer. + boolean aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); + boolean flush = last || !aggregate || len >= space; + // Async or Blocking ? while (true) { @@ -528,32 +535,27 @@ public void write(byte[] b, int off, int len) throws IOException switch (state) { case OPEN: - // process blocking below + // process blocking write below + _written = written; break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(state, State.PENDING)) + if (!_state.compareAndSet(state, flush ? State.PENDING : State.ASYNC)) continue; + _written = written; // Should we aggregate? - boolean last = isLastContentToWrite(len); - if (!last && len <= _commitSize) + if (aggregate) { acquireBuffer(); - - // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content - if (filled == len && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(_state.get().toString()); + if (!flush) return; - } // adjust offset/length off += filled; @@ -582,20 +584,13 @@ public void write(byte[] b, int off, int len) throws IOException } // handle blocking write - - // Should we aggregate? - // Yes - if the write is smaller than the commitSize (==aggregate buffer size) - // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. - boolean last = isLastContentToWrite(len); - if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate))) + if (aggregate) { acquireBuffer(); - - // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); - // return if we are not the last write and have aggregated all of the content - if (!last && filled == len && !BufferUtil.isFull(_aggregate)) + // return if we are not complete, not full and filled all the content + if (!flush) return; // adjust offset/length @@ -606,7 +601,7 @@ public void write(byte[] b, int off, int len) throws IOException // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { - write(_aggregate, last && len == 0); + channelWrite(_aggregate, last && len == 0); // should we fill aggregate again from the buffer? if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) @@ -622,21 +617,20 @@ public void write(byte[] b, int off, int len) throws IOException // write a buffer capacity at a time to avoid JVM pooling large direct buffers // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 ByteBuffer view = ByteBuffer.wrap(b, off, len); + while (len > getBufferSize()) { int p = view.position(); - int l = p + getBufferSize(); view.limit(p + getBufferSize()); - write(view, false); + channelWrite(view, false); + view.limit(p + len); len -= getBufferSize(); - view.limit(l + Math.min(len, getBufferSize())); - view.position(l); } - write(view, last); + channelWrite(view, last); } else if (last) { - write(BufferUtil.EMPTY_BUFFER, true); + channelWrite(BufferUtil.EMPTY_BUFFER, true); } } @@ -689,20 +683,22 @@ public void write(ByteBuffer buffer) throws IOException // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, last && len == 0); + channelWrite(_aggregate, last && len == 0); // write any remaining content in the buffer directly if (len > 0) - write(buffer, last); + channelWrite(buffer, last); else if (last) - write(BufferUtil.EMPTY_BUFFER, true); + channelWrite(BufferUtil.EMPTY_BUFFER, true); } @Override public void write(int b) throws IOException { - _written += 1; - boolean complete = _channel.getResponse().isAllContentWritten(_written); + long written = _written + 1; + boolean last = _channel.getResponse().isAllContentWritten(written); + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + boolean flush = last || space == 1; // Async or Blocking ? while (true) @@ -710,34 +706,32 @@ public void write(int b) throws IOException switch (_state.get()) { case OPEN: + _written = written; acquireBuffer(); BufferUtil.append(_aggregate, (byte)b); // Check if all written or full - if (complete || BufferUtil.isFull(_aggregate)) - write(_aggregate, complete); + if (flush) + channelWrite(_aggregate, last); break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(State.READY, State.PENDING)) + if (!_state.compareAndSet(State.READY, flush ? State.PENDING : State.ASYNC)) continue; + _written = written; acquireBuffer(); BufferUtil.append(_aggregate, (byte)b); // Check if all written or full - if (!complete && !BufferUtil.isFull(_aggregate)) + if (flush) { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(); - return; + // Do the asynchronous writing from the callback + new AsyncFlush().iterate(); } - - // Do the asynchronous writing from the callback - new AsyncFlush().iterate(); return; case PENDING: @@ -884,7 +878,7 @@ public void sendContent(ByteBuffer content) throws IOException LOG.debug("sendContent({})", BufferUtil.toDetailString(content)); _written += content.remaining(); - write(content, true); + channelWrite(content, true); } /** @@ -965,7 +959,7 @@ public void sendContent(ByteBuffer content, final Callback callback) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); _written += content.remaining(); - write(content, true, new Callback.Nested(callback) + channelWrite(content, true, new Callback.Nested(callback) { @Override public void succeeded() @@ -1319,7 +1313,7 @@ protected void onCompleteSuccess() if (_last) closed(); if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + _channel.execute(_channel); // TODO can we call directly? Why execute? break; case CLOSED: @@ -1356,14 +1350,14 @@ protected Action process() if (BufferUtil.hasContent(_aggregate)) { _flushed = true; - write(_aggregate, false, this); + channelWrite(_aggregate, false, this); return Action.SCHEDULED; } if (!_flushed) { _flushed = true; - write(BufferUtil.EMPTY_BUFFER, false, this); + channelWrite(BufferUtil.EMPTY_BUFFER, false, this); return Action.SCHEDULED; } @@ -1408,7 +1402,7 @@ protected Action process() if (BufferUtil.hasContent(_aggregate)) { _completed = _len == 0; - write(_aggregate, _last && _completed, this); + channelWrite(_aggregate, _last && _completed, this); return Action.SCHEDULED; } @@ -1428,7 +1422,7 @@ protected Action process() if (_slice == null) { _completed = true; - write(_buffer, _last, this); + channelWrite(_buffer, _last, this); return Action.SCHEDULED; } @@ -1440,7 +1434,7 @@ protected Action process() _buffer.position(pl); _slice.position(p); _completed = !_buffer.hasRemaining(); - write(_slice, _last && _completed, this); + channelWrite(_slice, _last && _completed, this); return Action.SCHEDULED; } @@ -1449,7 +1443,7 @@ protected Action process() if (_last && !_completed) { _completed = true; - write(BufferUtil.EMPTY_BUFFER, true, this); + channelWrite(BufferUtil.EMPTY_BUFFER, true, this); return Action.SCHEDULED; } @@ -1511,7 +1505,7 @@ protected Action process() throws Exception _buffer.position(0); _buffer.limit(len); _written += len; - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } @@ -1572,7 +1566,7 @@ protected Action process() throws Exception // write what we have BufferUtil.flipToFlush(_buffer, 0); _written += _buffer.remaining(); - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java index 094f98f1cbca..ee0f0977ad3e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java @@ -148,7 +148,7 @@ public void flush() out.flush(); } } - catch (IOException ex) + catch (Throwable ex) { setError(ex); }