Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #10145 - WritePendingException over HTTP/2 tunnel #10146

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void shutdownOutput()
case OSHUTTING:
if (!writeState.compareAndSet(current, WriteState.OSHUT))
break;
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.from(this::oshutSuccess, this::oshutFailure));
Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure);
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback);
return;
case PENDING:
if (!writeState.compareAndSet(current, WriteState.OSHUTTING))
Expand Down Expand Up @@ -177,7 +178,7 @@ public void close(Throwable cause)
if (closed.compareAndSet(false, true))
{
if (LOG.isDebugEnabled())
LOG.debug("closing {}, cause: {}", this, cause);
LOG.debug("closing {}", this, cause);
shutdownOutput();
stream.close();
onClose(cause);
Expand All @@ -188,7 +189,7 @@ public void close(Throwable cause)
public int fill(ByteBuffer sink) throws IOException
{
Entry entry;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
entry = dataQueue.poll();
}
Expand Down Expand Up @@ -222,7 +223,7 @@ public int fill(ByteBuffer sink) throws IOException

if (source.hasRemaining())
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
dataQueue.offerFirst(entry);
}
Expand All @@ -248,92 +249,27 @@ public boolean flush(ByteBuffer... buffers) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", BufferUtil.toDetailString(buffers), this);
if (buffers == null || buffers.length == 0)
{
if (buffers == null || buffers.length == 0 || remaining(buffers) == 0)
return true;
}
else
{
while (true)
{
WriteState current = writeState.get();
switch (current.state)
{
case IDLE:
if (!writeState.compareAndSet(current, WriteState.PENDING))
break;
// We must copy the buffers because, differently from
// write(), the semantic of flush() is that it does not
// own them, but stream.data() needs to own them.
ByteBuffer buffer = coalesce(buffers, true);
Callback.Completable callback = new Callback.Completable(Invocable.InvocationType.NON_BLOCKING);
stream.data(new DataFrame(stream.getId(), buffer, false), callback);
callback.whenComplete((nothing, failure) ->
{
if (failure == null)
flushSuccess();
else
flushFailure(failure);
});
return callback.isDone();
case PENDING:
return false;
case OSHUTTING:
case OSHUT:
throw new EofException("Output shutdown");
case FAILED:
Throwable failure = current.failure;
if (failure instanceof IOException)
throw (IOException)failure;
throw new IOException(failure);
}
}
}
}

private void flushSuccess()
{
while (true)
{
WriteState current = writeState.get();
switch (current.state)
{
case IDLE:
case OSHUT:
throw new IllegalStateException();
case PENDING:
if (!writeState.compareAndSet(current, WriteState.IDLE))
break;
return;
case OSHUTTING:
shutdownOutput();
return;
case FAILED:
return;
}
}
}

private void flushFailure(Throwable failure)
{
while (true)
WriteState current = writeState.get();
switch (current.state)
{
WriteState current = writeState.get();
switch (current.state)
{
case IDLE:
case OSHUT:
throw new IllegalStateException();
case PENDING:
if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure)))
break;
return;
case OSHUTTING:
shutdownOutput();
return;
case FAILED:
return;
}
// The flush() semantic is that it must not leave pending operations, therefore we
// cannot call stream.data() because it would remain pending if the stream is congested.
case IDLE:
gregw marked this conversation as resolved.
Show resolved Hide resolved
case PENDING:
return false;
case OSHUTTING:
case OSHUT:
throw new EofException("Output shutdown");
case FAILED:
Throwable failure = current.failure;
if (failure instanceof IOException)
throw (IOException)failure;
throw new IOException(failure);
default:
throw new IllegalStateException("Unexpected state: " + current.state);
}
}

Expand Down Expand Up @@ -397,8 +333,9 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE
if (!writeState.compareAndSet(current, WriteState.PENDING))
break;
// TODO: we really need a Stream primitive to write multiple frames.
ByteBuffer result = coalesce(buffers, false);
stream.data(new DataFrame(stream.getId(), result, false), Callback.from(() -> writeSuccess(callback), x -> writeFailure(x, callback)));
ByteBuffer result = coalesce(buffers);
Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback));
stream.data(new DataFrame(stream.getId(), result, false), dataCallback);
return;
case PENDING:
callback.failed(new WritePendingException());
Expand All @@ -410,6 +347,9 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE
case FAILED:
callback.failed(current.failure);
return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
}
}
}
Expand Down Expand Up @@ -438,6 +378,9 @@ private void writeSuccess(Callback callback)
case FAILED:
callback.failed(current.failure);
return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
}
}
}
Expand All @@ -461,23 +404,21 @@ private void writeFailure(Throwable failure, Callback callback)
return;
case FAILED:
return;
default:
callback.failed(new IllegalStateException("Unexpected state: " + current.state));
return;
}
}
}

private long remaining(ByteBuffer... buffers)
{
long total = 0;
for (ByteBuffer buffer : buffers)
{
total += buffer.remaining();
}
return total;
return BufferUtil.remaining(buffers);
}

private ByteBuffer coalesce(ByteBuffer[] buffers, boolean forceCopy)
private ByteBuffer coalesce(ByteBuffer[] buffers)
{
if (buffers.length == 1 && !forceCopy)
if (buffers.length == 1)
return buffers[0];
long capacity = remaining(buffers);
if (capacity > Integer.MAX_VALUE)
Expand Down Expand Up @@ -567,7 +508,7 @@ protected void offerFailure(Throwable failure)

private void offer(ByteBuffer buffer, Callback callback, Throwable failure)
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
dataQueue.offer(new Entry(buffer, callback, failure));
}
Expand All @@ -576,7 +517,7 @@ private void offer(ByteBuffer buffer, Callback callback, Throwable failure)
protected void process()
{
boolean empty;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
empty = dataQueue.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer
if (buffers != null)
{
if (DEBUG)
LOG.debug("flushed incomplete");
LOG.debug("flush incomplete {}", this);
PendingState pending = new PendingState(callback, address, buffers);
if (updateState(__WRITING, pending))
onIncompleteFlush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.http.client;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
Expand All @@ -23,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand All @@ -36,7 +38,9 @@
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.client.util.ByteBufferRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
Expand Down Expand Up @@ -73,6 +77,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -288,6 +293,58 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertEquals(1, connectionPool.getConnectionCount());
}

@ParameterizedTest(name = "proxyProtocol={0}, proxySecure={1}, serverProtocol={2}, serverSecure={3}")
@MethodSource("testParams")
public void testProxyConcurrentLoad(Origin.Protocol proxyProtocol, boolean proxySecure, HttpVersion serverProtocol, boolean serverSecure) throws Exception
{
start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
IO.copy(request.getInputStream(), response.getOutputStream());
}
});

int parallelism = 8;
boolean proxyMultiplexed = proxyProtocol.getProtocols().stream().allMatch(p -> p.startsWith("h2"));
client.setMaxConnectionsPerDestination(proxyMultiplexed ? 1 : parallelism);

int proxyPort = proxySecure ? proxyTLSConnector.getLocalPort() : proxyConnector.getLocalPort();
Origin.Address proxyAddress = new Origin.Address("localhost", proxyPort);
HttpProxy proxy = new HttpProxy(proxyAddress, proxySecure, proxyProtocol);
client.getProxyConfiguration().addProxy(proxy);

String scheme = serverSecure ? "https" : "http";
int serverPort = serverSecure ? serverTLSConnector.getLocalPort() : serverConnector.getLocalPort();
int contentLength = 128 * 1024;

int iterations = 16;
IntStream.range(0, parallelism).parallel().forEach(p ->
IntStream.range(0, iterations).forEach(i ->
{
try
{
String id = p + "-" + i;
ContentResponse response = client.newRequest("localhost", serverPort)
.scheme(scheme)
.method(HttpMethod.POST)
.path("/path/" + id)
.version(serverProtocol)
.body(new ByteBufferRequestContent(ByteBuffer.allocate(contentLength)))
.timeout(5, TimeUnit.SECONDS)
.send();

assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(contentLength, response.getContent().length);
}
catch (Throwable x)
{
throw new RuntimeException(x);
}
}));
}

@Test
public void testHTTP2TunnelClosedByClient() throws Exception
{
Expand Down