Skip to content

Commit

Permalink
fix fabric8io#4201: ensuring okhttp is tolerant to multiple consume c…
Browse files Browse the repository at this point in the history
…alls

also converting jetty to fully non-blocking

and adapting the body delivery for jdk for use with sendAsync logic - we
need the AsyncBody to be available immediately
  • Loading branch information
shawkins committed Oct 6, 2022
1 parent ed64589 commit 130d42a
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.net.http.WebSocketHandshakeException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -47,48 +49,80 @@
*/
public class JdkHttpClientImpl implements HttpClient {

/**
* Adapts the BodyHandler to immediately complete the body
*/
private static final class BodyHandlerAdapter implements BodyHandler<AsyncBody> {
private final AsyncBodySubscriber<?> subscriber;
private final BodyHandler<Void> handler;

private BodyHandlerAdapter(AsyncBodySubscriber<?> subscriber, BodyHandler<Void> handler) {
this.subscriber = subscriber;
this.handler = handler;
}

@Override
public BodySubscriber<AsyncBody> apply(ResponseInfo responseInfo) {
BodySubscriber<Void> bodySubscriber = handler.apply(responseInfo);
return new BodySubscriber<AsyncBody>() {
CompletableFuture<AsyncBody> cf = CompletableFuture.completedFuture(subscriber);

@Override
public void onSubscribe(Subscription subscription) {
bodySubscriber.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
bodySubscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
bodySubscriber.onError(throwable);
}

@Override
public void onComplete() {
bodySubscriber.onComplete();
}

@Override
public CompletionStage<AsyncBody> getBody() {
return cf;
}
};
}
}

private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private final BodyConsumer<T> consumer;
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
this.consumer = consumer;
}

@Override
public void onSubscribe(Subscription subscription) {
if (!subscribed.compareAndSet(false, true)) {
if (this.subscription.isDone()) {
subscription.cancel();
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
this.subscription.complete(subscription);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
} else {
consumer.consume(item, this);
}
} catch (Exception e) {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.completeExceptionally(e);
}
}
Expand All @@ -100,10 +134,6 @@ public void onError(Throwable throwable) {

@Override
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

Expand All @@ -112,19 +142,7 @@ public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
this.subscription.thenAccept(s -> s.request(1));
}

@Override
Expand All @@ -134,7 +152,7 @@ public CompletableFuture<Void> done() {

@Override
public void cancel() {
subscription.cancel();
subscription.thenAccept(Subscription::cancel);
done.cancel(false);
}

Expand Down Expand Up @@ -234,7 +252,8 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest reque
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

Expand All @@ -243,7 +262,8 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,47 @@
*/
package io.fabric8.kubernetes.client.jetty;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.Callback;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private boolean consume;
private CompletableFuture<LongConsumer> demand = new CompletableFuture<>();
private boolean initialConsumeCalled;
private Runnable initialConsume;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
consume = false;
}

@Override
public synchronized void consume() {
consume = true;
this.notifyAll();
public void consume() {
synchronized (this) {
if (!this.initialConsumeCalled) {
this.initialConsumeCalled = true;
if (this.initialConsume != null) {
this.initialConsume.run();
this.initialConsume = null;
}
}
}
demand.thenAccept(l -> l.accept(1));
}

@Override
Expand Down Expand Up @@ -74,21 +84,20 @@ public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request requ
}

@Override
public void onContent(Response response, ByteBuffer content) {
try {
synchronized (this) {
while (!consume && !asyncBodyDone.isCancelled()) {
this.wait();
}
}
if (!asyncBodyDone.isCancelled()) {
bodyConsumer.consume(process(response, content), this);
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) {
synchronized (this) {
if (!initialConsumeCalled) {
// defer until consume is called
this.initialConsume = () -> onContent(response, demand, content, callback);
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
this.demand.complete(demand);
}
try {
bodyConsumer.consume(process(response, content), this);
callback.succeeded();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
callback.failed(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,21 @@ public void consume() {
// consume should not block from a callers perspective
try {
httpClient.dispatcher().executorService().execute(() -> {
try {
if (!source.exhausted() && !done.isDone()) {
T value = process(source);
consumer.consume(value, this);
} else {
done.complete(null);
// we must serialize multiple consumes as source is not thread-safe
// it would be better to use SerialExecutor, but that would need to move modules, as to
// potentially not hold multiple executor threads
synchronized (source) {
try {
if (!source.exhausted() && !done.isDone()) {
T value = process(source);
consumer.consume(value, this);
} else {
done.complete(null);
}
} catch (Exception e) {
Utils.closeQuietly(source);
done.completeExceptionally(e);
}
} catch (Exception e) {
Utils.closeQuietly(source);
done.completeExceptionally(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import java.io.InputStream;
import java.io.Reader;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -53,8 +56,8 @@ protected HttpClient.Factory getHttpClientFactory() {
return new OkHttpClientFactory();
}

private KubernetesMockServer server;
private KubernetesClient client;
KubernetesMockServer server;
KubernetesClient client;

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -148,15 +151,17 @@ public void onMessage(WebSocket webSocket, String text) {

@Test
void testAsyncBody() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello world").always();
int byteCount = 20000;
server.expect().withPath("/async").andReturn(200, new String(new byte[byteCount], StandardCharsets.UTF_8)).always();

CompletableFuture<Boolean> consumed = new CompletableFuture<>();
CompletableFuture<Integer> consumed = new CompletableFuture<>();
AtomicInteger total = new AtomicInteger();

CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeBytes(
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
.build(),
(value, asyncBody) -> {
consumed.complete(true);
value.stream().map(ByteBuffer::remaining).forEach(total::addAndGet);
asyncBody.consume();
});

Expand All @@ -169,15 +174,14 @@ void testAsyncBody() throws Exception {
r.body().done().whenComplete((v, ex) -> {
if (ex != null) {
consumed.completeExceptionally(ex);
}
if (v != null) {
consumed.complete(false);
} else {
consumed.complete(total.get());
}
});
}
});

assertTrue(consumed.get(5, TimeUnit.SECONDS));
assertEquals(byteCount, consumed.get(5, TimeUnit.SECONDS));
}

@Test
Expand Down Expand Up @@ -219,14 +223,15 @@ void testConsumeLines() throws Exception {
@ParameterizedTest(name = "{index}: {0}")
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
void testSupportedTypes(Class<?> type) throws Exception {
server.expect().withPath("/type").andReturn(200, "hello world").always();
String value = new String(new byte[16384]);
server.expect().withPath("/type").andReturn(200, value).always();
final HttpResponse<?> result = client.getHttpClient()
.sendAsync(client.getHttpClient().newHttpRequestBuilder()
.uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type)
.get(10, TimeUnit.SECONDS);
assertThat(result)
.satisfies(r -> assertThat(r.body()).isInstanceOf(type))
.satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world"));
.satisfies(r -> assertThat(r.bodyString()).isEqualTo(value));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class ByteArrayBodyHandler implements BodyConsumer<List<ByteBuffer>> {

private final LinkedList<ByteBuffer> buffers = new LinkedList<>();
private final CompletableFuture<byte[]> result = new CompletableFuture<byte[]>();
private final CompletableFuture<byte[]> result = new CompletableFuture<>();

@Override
public synchronized void consume(List<ByteBuffer> value, AsyncBody asyncBody) throws Exception {
Expand Down
Loading

0 comments on commit 130d42a

Please sign in to comment.