Skip to content

Commit

Permalink
correcting jdk logic wrt interceptors and subscriptions
Browse files Browse the repository at this point in the history
also making jetty consume apply to each message, and removing the okhttp
requirement for a newline
  • Loading branch information
shawkins authored and manusa committed Jun 9, 2022
1 parent 9a0b973 commit 65b7900
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
* TODO:
Expand All @@ -57,6 +58,9 @@ private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
private CompletableFuture<Void> done = new CompletableFuture<Void>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private T initialItem;
private boolean first = true;
private boolean isComplete;

private AsyncBodySubscriber(BodyConsumer<T> consumer) {
this.consumer = consumer;
Expand All @@ -69,11 +73,20 @@ public void onSubscribe(Subscription subscription) {
return;
}
this.subscription = subscription;
// the sendAsync future won't complete unless we do the initial request here
// so in onNext we'll trap the item until we're ready
subscription.request(1);
}

@Override
public void onNext(T item) {
synchronized (this) {
if (first) {
this.initialItem = item;
first = false;
return;
}
}
try {
if (item == null) {
done.complete(null);
Expand All @@ -92,13 +105,32 @@ public void onError(Throwable throwable) {
}

@Override
public void onComplete() {
public synchronized void onComplete() {
if (initialItem != null) {
this.isComplete = true;
return;
}
done.complete(null);
}

@Override
public void consume() {
this.subscription.request(1);
public synchronized void consume() {
if (done.isDone()) {
return;
}
try {
first = false;
if (initialItem != null) {
T item = initialItem;
initialItem = null;
onNext(item);
}
} finally {
if (isComplete) {
done.complete(null);
}
this.subscription.request(1);
}
}

@Override
Expand Down Expand Up @@ -160,6 +192,26 @@ public Optional<HttpResponse<?>> previousResponse() {

}

static class AsyncResponse<T> {
java.net.http.HttpResponse<T> response;
AsyncBody asyncBody;

public AsyncResponse(java.net.http.HttpResponse<T> response, AsyncBody asyncBody) {
this.response = response;
this.asyncBody = asyncBody;
}
}

static class HandlerAndAsyncBody<T> {
BodyHandler<T> handler;
AsyncBody asyncBody;

public HandlerAndAsyncBody(BodyHandler<T> handler, AsyncBody asyncBody) {
this.handler = handler;
this.asyncBody = asyncBody;
}
}

private JdkHttpClientBuilderImpl builder;
private java.net.http.HttpClient httpClient;

Expand All @@ -185,20 +237,29 @@ public DerivedClientBuilder newBuilder() {

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest request, BodyConsumer<String> consumer) {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r, subscriber));
return sendAsync(request, () -> {
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, BodyConsumer<List<ByteBuffer>> consumer) {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r, subscriber));
return sendAsync(request, () -> {
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
return new HandlerAndAsyncBody<>(handler, subscriber);
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Class<T> type) {
return sendAsync(request, () -> new HandlerAndAsyncBody<T>(toBodyHandler(type), null))
.thenApply(ar -> new JdkHttpResponseImpl<>(ar.response));
}

private <T> BodyHandler<T> toBodyHandler(Class<T> type) {
BodyHandler<T> bodyHandler;
if (type == null) {
bodyHandler = (BodyHandler<T>) BodyHandlers.discarding();
Expand All @@ -218,31 +279,38 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request, Cla
return (BodySubscriber<T>) downstream;
};
}
return sendAsync(request, bodyHandler).thenApply(JdkHttpResponseImpl::new);
return bodyHandler;
}

public <T> CompletableFuture<java.net.http.HttpResponse<T>> sendAsync(HttpRequest request, BodyHandler<T> bodyHandler) {
public <T> CompletableFuture<AsyncResponse<T>> sendAsync(HttpRequest request,
Supplier<HandlerAndAsyncBody<T>> handlerAndAsyncBodySupplier) {
JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request;
JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder();
for (Interceptor interceptor : builder.interceptors.values()) {
interceptor.before(builderImpl, jdkRequest);
jdkRequest = builderImpl.build();
}

CompletableFuture<java.net.http.HttpResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
bodyHandler);
HandlerAndAsyncBody<T> handlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

CompletableFuture<AsyncResponse<T>> cf = this.getHttpClient().sendAsync(builderImpl.build().request,
handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody));

for (Interceptor interceptor : builder.interceptors.values()) {
cf = cf.thenCompose(response -> {
cf = cf.thenCompose(ar -> {
java.net.http.HttpResponse<T> response = ar.response;
if (response != null && !HttpResponse.isSuccessful(response.statusCode())) {
return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> {
if (b) {
return this.getHttpClient().sendAsync(builderImpl.build().request, bodyHandler);
HandlerAndAsyncBody<T> interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get();

return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler)
.thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody));
}
return CompletableFuture.completedFuture(response);
return CompletableFuture.completedFuture(ar);
});
}
return CompletableFuture.completedFuture(response);
return CompletableFuture.completedFuture(ar);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,12 @@

import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest;
import io.fabric8.kubernetes.client.http.HttpClient;
import org.junit.jupiter.api.Disabled;


@SuppressWarnings("java:S2187")
public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest {
public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}

// TODO: Check tests validate expected behavior
@Disabled("TODO: Check tests validate expected behavior")
@Override
public void consumeLinesProcessedAfterConsume() throws Exception {
super.consumeLinesProcessedAfterConsume();
}

// TODO: Check tests validate expected behavior
@Disabled("TODO: Check tests validate expected behavior")
@Override
public void consumeLinesNotProcessedIfCancelled() throws Exception {
super.consumeLinesNotProcessedIfCancelled();
}

// TODO: Check tests validate expected behavior
@Disabled("TODO: Check tests validate expected behavior")
@Override
public void consumeByteBufferLinesProcessedAfterConsume() throws Exception {
super.consumeByteBufferLinesProcessedAfterConsume();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.fabric8.kubernetes.client.http.AbstractInterceptorTest;
import io.fabric8.kubernetes.client.http.HttpClient;
import org.junit.jupiter.api.Disabled;

@SuppressWarnings("java:S2187")
public class JdkHttpClientInterceptorTest extends AbstractInterceptorTest {
Expand All @@ -26,15 +25,4 @@ protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}

// TODO: Check implementation
@Disabled("TODO: Check implementation")
@Override
public void afterHttpFailureReplacesResponseInConsumeLines() {
}

// TODO: Check implementation
@Disabled("TODO: Check implementation")
@Override
public void afterHttpFailureReplacesResponseInConsumeBytes() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.fabric8.kubernetes.client.http.HttpClient;

@SuppressWarnings("java:S2187")
public class JdkHttpClientPostTest extends AbstractHttpPostTest {
public class JdkHttpClientPostTest extends AbstractHttpPostTest {
@Override
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.util.concurrent.TimeUnit;

@SuppressWarnings("unchecked")
public abstract class DerivedJettyHttpClientBuilder<T extends HttpClient.DerivedClientBuilder> implements HttpClient.DerivedClientBuilder {
public abstract class DerivedJettyHttpClientBuilder<T extends HttpClient.DerivedClientBuilder>
implements HttpClient.DerivedClientBuilder {

final JettyHttpClientFactory factory;
Duration readTimeout = Duration.ZERO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,26 @@

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {

private final HttpRequest httpRequest;
private final HttpClient.BodyConsumer<T> bodyConsumer;
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
private final CompletableFuture<Void> asyncBodyDone;
private final CountDownLatch consumeLock;
private boolean consume = false;

JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
this.httpRequest = httpRequest;
this.bodyConsumer = bodyConsumer;
asyncResponse = new CompletableFuture<>();
asyncBodyDone = new CompletableFuture<>();
consumeLock = new CountDownLatch(1);
}

@Override
public void consume() {
consumeLock.countDown();
public synchronized void consume() {
consume = true;
this.notifyAll();
}

@Override
Expand Down Expand Up @@ -76,7 +75,11 @@ public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request requ
@Override
public void onContent(Response response, ByteBuffer content) {
try {
consumeLock.await();
synchronized (this) {
while (!consume && !asyncBodyDone.isCancelled()) {
this.wait();
}
}
if (!asyncBodyDone.isCancelled()) {
bodyConsumer.consume(process(response, content), this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import javax.net.ssl.TrustManager;

public class JettyHttpClientBuilder extends DerivedJettyHttpClientBuilder<JettyHttpClientBuilder>
implements Builder {
implements Builder {

private Duration connectTimeout;
private SSLContext sslContext;
Expand Down
Loading

0 comments on commit 65b7900

Please sign in to comment.