From 65b7900349e4eac20066f22567c5e92c768ba900 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 8 Jun 2022 14:53:41 -0400 Subject: [PATCH] correcting jdk logic wrt interceptors and subscriptions also making jetty consume apply to each message, and removing the okhttp requirement for a newline --- .../client/jdkhttp/JdkHttpClientImpl.java | 102 +++++++++++++++--- .../jdkhttp/JdkHttpClientAsyncBodyTest.java | 24 +---- .../jdkhttp/JdkHttpClientInterceptorTest.java | 12 --- .../client/jdkhttp/JdkHttpClientPostTest.java | 2 +- .../jetty/DerivedJettyHttpClientBuilder.java | 3 +- .../jetty/JettyAsyncResponseListener.java | 15 +-- .../client/jetty/JettyHttpClientBuilder.java | 2 +- .../jetty/JettyHttpClientBuilderTest.java | 77 +++++++------ .../client/jetty/JettyHttpClientTest.java | 6 +- .../client/jetty/JettyWebSocketTest.java | 2 +- .../client/okhttp/OkHttpClientImpl.java | 2 +- .../client/okhttp/OkHttpInterceptorTest.java | 13 --- .../client/http/AbstractAsyncBodyTest.java | 9 +- .../client/http/AbstractInterceptorTest.java | 19 ++-- 14 files changed, 160 insertions(+), 128 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index 168f22df6ef..e68d1110d6b 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -44,6 +44,7 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * TODO: @@ -57,6 +58,9 @@ private final class AsyncBodySubscriber implements Subscriber, AsyncBody { private CompletableFuture done = new CompletableFuture(); private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile Flow.Subscription subscription; + private T initialItem; + private boolean first = true; + private boolean isComplete; private AsyncBodySubscriber(BodyConsumer consumer) { this.consumer = consumer; @@ -69,11 +73,20 @@ public void onSubscribe(Subscription subscription) { return; } this.subscription = subscription; + // the sendAsync future won't complete unless we do the initial request here + // so in onNext we'll trap the item until we're ready subscription.request(1); } @Override public void onNext(T item) { + synchronized (this) { + if (first) { + this.initialItem = item; + first = false; + return; + } + } try { if (item == null) { done.complete(null); @@ -92,13 +105,32 @@ public void onError(Throwable throwable) { } @Override - public void onComplete() { + public synchronized void onComplete() { + if (initialItem != null) { + this.isComplete = true; + return; + } done.complete(null); } @Override - public void consume() { - this.subscription.request(1); + public synchronized void consume() { + if (done.isDone()) { + return; + } + try { + first = false; + if (initialItem != null) { + T item = initialItem; + initialItem = null; + onNext(item); + } + } finally { + if (isComplete) { + done.complete(null); + } + this.subscription.request(1); + } } @Override @@ -160,6 +192,26 @@ public Optional> previousResponse() { } + static class AsyncResponse { + java.net.http.HttpResponse response; + AsyncBody asyncBody; + + public AsyncResponse(java.net.http.HttpResponse response, AsyncBody asyncBody) { + this.response = response; + this.asyncBody = asyncBody; + } + } + + static class HandlerAndAsyncBody { + BodyHandler handler; + AsyncBody asyncBody; + + public HandlerAndAsyncBody(BodyHandler handler, AsyncBody asyncBody) { + this.handler = handler; + this.asyncBody = asyncBody; + } + } + private JdkHttpClientBuilderImpl builder; private java.net.http.HttpClient httpClient; @@ -185,20 +237,29 @@ public DerivedClientBuilder newBuilder() { @Override public CompletableFuture> consumeLines(HttpRequest request, BodyConsumer consumer) { - AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); - BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); - return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl(r, subscriber)); + return sendAsync(request, () -> { + AsyncBodySubscriber subscriber = new AsyncBodySubscriber<>(consumer); + BodyHandler handler = BodyHandlers.fromLineSubscriber(subscriber); + return new HandlerAndAsyncBody<>(handler, subscriber); + }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } @Override public CompletableFuture> consumeBytes(HttpRequest request, BodyConsumer> consumer) { - AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); - BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); - return sendAsync(request, handler).thenApply(r -> new JdkHttpResponseImpl(r, subscriber)); + return sendAsync(request, () -> { + AsyncBodySubscriber> subscriber = new AsyncBodySubscriber<>(consumer); + BodyHandler handler = BodyHandlers.fromSubscriber(subscriber); + return new HandlerAndAsyncBody<>(handler, subscriber); + }).thenApply(r -> new JdkHttpResponseImpl(r.response, r.asyncBody)); } @Override public CompletableFuture> sendAsync(HttpRequest request, Class type) { + return sendAsync(request, () -> new HandlerAndAsyncBody(toBodyHandler(type), null)) + .thenApply(ar -> new JdkHttpResponseImpl<>(ar.response)); + } + + private BodyHandler toBodyHandler(Class type) { BodyHandler bodyHandler; if (type == null) { bodyHandler = (BodyHandler) BodyHandlers.discarding(); @@ -218,10 +279,11 @@ public CompletableFuture> sendAsync(HttpRequest request, Cla return (BodySubscriber) downstream; }; } - return sendAsync(request, bodyHandler).thenApply(JdkHttpResponseImpl::new); + return bodyHandler; } - public CompletableFuture> sendAsync(HttpRequest request, BodyHandler bodyHandler) { + public CompletableFuture> sendAsync(HttpRequest request, + Supplier> handlerAndAsyncBodySupplier) { JdkHttpRequestImpl jdkRequest = (JdkHttpRequestImpl) request; JdkHttpRequestImpl.BuilderImpl builderImpl = jdkRequest.newBuilder(); for (Interceptor interceptor : builder.interceptors.values()) { @@ -229,20 +291,26 @@ public CompletableFuture> sendAsync(HttpReques jdkRequest = builderImpl.build(); } - CompletableFuture> cf = this.getHttpClient().sendAsync(builderImpl.build().request, - bodyHandler); + HandlerAndAsyncBody handlerAndAsyncBody = handlerAndAsyncBodySupplier.get(); + + CompletableFuture> cf = this.getHttpClient().sendAsync(builderImpl.build().request, + handlerAndAsyncBody.handler).thenApply(r -> new AsyncResponse<>(r, handlerAndAsyncBody.asyncBody)); for (Interceptor interceptor : builder.interceptors.values()) { - cf = cf.thenCompose(response -> { + cf = cf.thenCompose(ar -> { + java.net.http.HttpResponse response = ar.response; if (response != null && !HttpResponse.isSuccessful(response.statusCode())) { return interceptor.afterFailure(builderImpl, new JdkHttpResponseImpl<>(response)).thenCompose(b -> { if (b) { - return this.getHttpClient().sendAsync(builderImpl.build().request, bodyHandler); + HandlerAndAsyncBody interceptedHandlerAndAsyncBody = handlerAndAsyncBodySupplier.get(); + + return this.getHttpClient().sendAsync(builderImpl.build().request, interceptedHandlerAndAsyncBody.handler) + .thenApply(r -> new AsyncResponse<>(r, interceptedHandlerAndAsyncBody.asyncBody)); } - return CompletableFuture.completedFuture(response); + return CompletableFuture.completedFuture(ar); }); } - return CompletableFuture.completedFuture(response); + return CompletableFuture.completedFuture(ar); }); } diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java index b1935142275..03416ebca15 100644 --- a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java @@ -17,34 +17,12 @@ import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest; import io.fabric8.kubernetes.client.http.HttpClient; -import org.junit.jupiter.api.Disabled; - @SuppressWarnings("java:S2187") -public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest { +public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest { @Override protected HttpClient.Factory getHttpClientFactory() { return new JdkHttpClientFactory(); } - // TODO: Check tests validate expected behavior - @Disabled("TODO: Check tests validate expected behavior") - @Override - public void consumeLinesProcessedAfterConsume() throws Exception { - super.consumeLinesProcessedAfterConsume(); - } - - // TODO: Check tests validate expected behavior - @Disabled("TODO: Check tests validate expected behavior") - @Override - public void consumeLinesNotProcessedIfCancelled() throws Exception { - super.consumeLinesNotProcessedIfCancelled(); - } - - // TODO: Check tests validate expected behavior - @Disabled("TODO: Check tests validate expected behavior") - @Override - public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { - super.consumeByteBufferLinesProcessedAfterConsume(); - } } diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java index 5dfe39ee11b..a4ec3fd086f 100644 --- a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java @@ -17,7 +17,6 @@ import io.fabric8.kubernetes.client.http.AbstractInterceptorTest; import io.fabric8.kubernetes.client.http.HttpClient; -import org.junit.jupiter.api.Disabled; @SuppressWarnings("java:S2187") public class JdkHttpClientInterceptorTest extends AbstractInterceptorTest { @@ -26,15 +25,4 @@ protected HttpClient.Factory getHttpClientFactory() { return new JdkHttpClientFactory(); } - // TODO: Check implementation - @Disabled("TODO: Check implementation") - @Override - public void afterHttpFailureReplacesResponseInConsumeLines() { - } - - // TODO: Check implementation - @Disabled("TODO: Check implementation") - @Override - public void afterHttpFailureReplacesResponseInConsumeBytes() { - } } diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java index 68a5e36148a..b93e414918d 100644 --- a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java @@ -19,7 +19,7 @@ import io.fabric8.kubernetes.client.http.HttpClient; @SuppressWarnings("java:S2187") -public class JdkHttpClientPostTest extends AbstractHttpPostTest { +public class JdkHttpClientPostTest extends AbstractHttpPostTest { @Override protected HttpClient.Factory getHttpClientFactory() { return new JdkHttpClientFactory(); diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java index dcb0a8e3aeb..62febd62757 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java @@ -24,7 +24,8 @@ import java.util.concurrent.TimeUnit; @SuppressWarnings("unchecked") -public abstract class DerivedJettyHttpClientBuilder implements HttpClient.DerivedClientBuilder { +public abstract class DerivedJettyHttpClientBuilder + implements HttpClient.DerivedClientBuilder { final JettyHttpClientFactory factory; Duration readTimeout = Duration.ZERO; diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java index b71a02433f2..28e7c2bee79 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { @@ -33,19 +32,19 @@ public abstract class JettyAsyncResponseListener extends Response.Listener.Ad private final HttpClient.BodyConsumer bodyConsumer; private final CompletableFuture> asyncResponse; private final CompletableFuture asyncBodyDone; - private final CountDownLatch consumeLock; + private boolean consume = false; JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { this.httpRequest = httpRequest; this.bodyConsumer = bodyConsumer; asyncResponse = new CompletableFuture<>(); asyncBodyDone = new CompletableFuture<>(); - consumeLock = new CountDownLatch(1); } @Override - public void consume() { - consumeLock.countDown(); + public synchronized void consume() { + consume = true; + this.notifyAll(); } @Override @@ -76,7 +75,11 @@ public CompletableFuture> listen(Request requ @Override public void onContent(Response response, ByteBuffer content) { try { - consumeLock.await(); + synchronized (this) { + while (!consume && !asyncBodyDone.isCancelled()) { + this.wait(); + } + } if (!asyncBodyDone.isCancelled()) { bodyConsumer.consume(process(response, content), this); } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java index 965896705a4..92e9a062f26 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java @@ -42,7 +42,7 @@ import javax.net.ssl.TrustManager; public class JettyHttpClientBuilder extends DerivedJettyHttpClientBuilder - implements Builder { + implements Builder { private Duration connectTimeout; private SSLContext sslContext; diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java index d3b894c2321..240b6ce9463 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java @@ -56,12 +56,11 @@ static void afterAll() { void underlyingHttpAndWsClientsDifferentTransports() { try (var client = factory.newBuilder().build()) { assertThat(client) - .satisfies(c -> assertThat(c.getJetty()) - .isNotSameAs(c.getJettyWs().getHttpClient()) - .returns(c.getJettyWs().getSslContextFactory(), HttpClient::getSslContextFactory) - .extracting(HttpClient::getTransport) - .isNotSameAs(c.getJettyWs().getHttpClient().getTransport()) - ); + .satisfies(c -> assertThat(c.getJetty()) + .isNotSameAs(c.getJettyWs().getHttpClient()) + .returns(c.getJettyWs().getSslContextFactory(), HttpClient::getSslContextFactory) + .extracting(HttpClient::getTransport) + .isNotSameAs(c.getJettyWs().getHttpClient().getTransport())); } } @@ -70,9 +69,9 @@ void underlyingHttpAndWsClientsDifferentTransports() { void generatedWSClientHasDisabledIdleTimeout() { try (var client = factory.newBuilder().build()) { assertThat(client) - .extracting(JettyHttpClient::getJettyWs) - .extracting(WebSocketClient::getIdleTimeout) - .isEqualTo(Duration.ZERO); + .extracting(JettyHttpClient::getJettyWs) + .extracting(WebSocketClient::getIdleTimeout) + .isEqualTo(Duration.ZERO); } } @@ -82,11 +81,11 @@ void buildClientBuilderBuildShareWsAndHttpClients() { try (var client = factory.newBuilder().build()) { final var client2 = client.newBuilder().build(); assertThat(client2) - .isInstanceOf(JettyHttpClient.class) - .asInstanceOf(InstanceOfAssertFactories.type(JettyHttpClient.class)) - .isNotSameAs(client) - .returns(client.getJetty(), JettyHttpClient::getJetty) - .returns(client.getJettyWs(), JettyHttpClient::getJettyWs); + .isInstanceOf(JettyHttpClient.class) + .asInstanceOf(InstanceOfAssertFactories.type(JettyHttpClient.class)) + .isNotSameAs(client) + .returns(client.getJetty(), JettyHttpClient::getJetty) + .returns(client.getJettyWs(), JettyHttpClient::getJettyWs); } } @@ -95,8 +94,8 @@ void buildClientBuilderBuildShareWsAndHttpClients() { void connectTimeout() { try (var client = factory.newBuilder().connectTimeout(1337, TimeUnit.MILLISECONDS).build()) { assertThat(client) - .returns(1337L, c -> c.getJetty().getConnectTimeout()) - .returns(1337L, c -> c.getJettyWs().getConnectTimeout()); + .returns(1337L, c -> c.getJetty().getConnectTimeout()) + .returns(1337L, c -> c.getJettyWs().getConnectTimeout()); } } @@ -104,19 +103,19 @@ void connectTimeout() { @DisplayName("followRedirects=false, no redirection") void followAllRedirectsDisabled() throws Exception { server.expect() - .withPath("/redirect-me") - .andReply(ResponseProviders.of(301, "", Collections.singletonMap("Location", "/new-location"))) - .always(); + .withPath("/redirect-me") + .andReply(ResponseProviders.of(301, "", Collections.singletonMap("Location", "/new-location"))) + .always(); server.expect() - .withPath("/new-location") - .andReturn(200, "You made it!") - .always(); + .withPath("/new-location") + .andReturn(200, "You made it!") + .always(); try (var client = factory.newBuilder().build()) { final var result = client - .sendAsync(client.newHttpRequestBuilder().uri(server.url("redirect-me")).build(), String.class) + .sendAsync(client.newHttpRequestBuilder().uri(server.url("redirect-me")).build(), String.class) .get(10, TimeUnit.SECONDS); assertThat(result) - .returns(301, HttpResponse::code); + .returns(301, HttpResponse::code); } } @@ -124,26 +123,26 @@ void followAllRedirectsDisabled() throws Exception { @DisplayName("followAllRedirects=true, redirected") void followAllRedirectsEnabled() throws Exception { server.expect() - .withPath("/redirect-me") - .andReply(ResponseProviders.of(301, "", Collections.singletonMap("Location", "/new-location"))) - .always(); + .withPath("/redirect-me") + .andReply(ResponseProviders.of(301, "", Collections.singletonMap("Location", "/new-location"))) + .always(); server.expect() - .withPath("/new-location") - .andReturn(200, "You made it!") - .always(); + .withPath("/new-location") + .andReturn(200, "You made it!") + .always(); try (var client = factory.newBuilder().followAllRedirects().build()) { final var result = client - .sendAsync(client.newHttpRequestBuilder().uri(server.url("redirect-me")).build(), String.class) + .sendAsync(client.newHttpRequestBuilder().uri(server.url("redirect-me")).build(), String.class) .get(10, TimeUnit.SECONDS); assertThat(result) - .returns(200, HttpResponse::code) - .returns("You made it!", r -> { - try { - return r.bodyString(); - } catch (IOException ignored) { - return null; - } - }); + .returns(200, HttpResponse::code) + .returns("You made it!", r -> { + try { + return r.bodyString(); + } catch (IOException ignored) { + return null; + } + }); } } diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java index 2ce2e281b21..c701fa23fd3 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java @@ -76,14 +76,14 @@ void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Excep originalBuilder, httpClient, webSocketClient, Collections.emptyList(), null)) { // When final var result = firstClient.newBuilder() - .readTimeout(313373, TimeUnit.SECONDS); + .readTimeout(313373, TimeUnit.SECONDS); // Then assertThat(result) .isNotNull() .isInstanceOf(DerivedJettyHttpClientBuilder.class) .isNotSameAs(originalBuilder); final var expected = Map.of( - "tlsVersions", new TlsVersion[]{TlsVersion.SSL_3_0}, + "tlsVersions", new TlsVersion[] { TlsVersion.SSL_3_0 }, "followAllRedirects", true); for (var entry : expected.entrySet()) { final var field = JettyHttpClientBuilder.class.getDeclaredField(entry.getKey()); @@ -132,7 +132,7 @@ void sendAsyncUnsupportedHttpRequest() { @DisplayName("newWebSocketBuilder instantiates a JettyWebSocketBuilder") void newWebSocketBuilderInstantiatesJettyWebSocketBuilder() { try (var jettyHttpClient = new JettyHttpClient( - new JettyHttpClientBuilder(null), httpClient, webSocketClient, Collections.emptyList(), null)) { + new JettyHttpClientBuilder(null), httpClient, webSocketClient, Collections.emptyList(), null)) { // When final var result = jettyHttpClient.newWebSocketBuilder(); // Then diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java index a743f8bd3fd..75e05f5ca3a 100644 --- a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -229,7 +229,7 @@ void sendIncreasesQueueSize() { jws.setWebSocketSession(session); when(session.isOpen()).thenReturn(true); // When - jws.send(ByteBuffer.wrap(new byte[] {1, 3, 3, 7})); + jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 })); // Then assertThat(jws.queueSize()).isEqualTo(4L); } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index eb43302bb4a..f0d3262ba6e 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -211,7 +211,7 @@ public CompletableFuture> consumeLines(HttpRequest reque Function handler = s -> new OkHttpAsyncBody(consumer, s) { @Override protected String process(BufferedSource source) throws IOException { - return source.readUtf8LineStrict(); + return source.readUtf8Line(); } }; return sendAsync(request, handler); diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java index 35155c6b403..523521ced78 100644 --- a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java @@ -17,7 +17,6 @@ import io.fabric8.kubernetes.client.http.AbstractInterceptorTest; import io.fabric8.kubernetes.client.http.HttpClient; -import org.junit.jupiter.api.Disabled; @SuppressWarnings("java:S2187") public class OkHttpInterceptorTest extends AbstractInterceptorTest { @@ -25,16 +24,4 @@ public class OkHttpInterceptorTest extends AbstractInterceptorTest { protected HttpClient.Factory getHttpClientFactory() { return new OkHttpClientFactory(); } - - // TODO: Check implementation - @Disabled("TODO: Check implementation") - @Override - public void afterHttpFailureReplacesResponseInConsumeLines() { - } - - // TODO: Check implementation - @Disabled("TODO: Check implementation") - @Override - public void afterHttpFailureReplacesResponseInConsumeBytes() { - } } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java index f15715f5221..a55e868e60f 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -60,7 +60,7 @@ public void consumeLinesProcessedAfterConsume() throws Exception { client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), (value, asyncBody) -> { responseText.append(value); - asyncBody.done().complete(null); // OkHttp requires this, not sure if it should + asyncBody.consume(); }) .get(10L, TimeUnit.SECONDS); assertThat(responseText).isEmpty(); @@ -80,7 +80,10 @@ public void consumeLinesNotProcessedIfCancelled() throws Exception { final StringBuffer responseText = new StringBuffer(); final HttpResponse asyncBodyResponse = client .consumeLines(client.newHttpRequestBuilder() - .uri(server.url("/cancel")).build(), (value, asyncBody) -> responseText.append(value)) + .uri(server.url("/cancel")).build(), (value, asyncBody) -> { + responseText.append(value); + asyncBody.consume(); + }) .get(10L, TimeUnit.SECONDS); asyncBodyResponse.body().cancel(); asyncBodyResponse.body().consume(); @@ -103,7 +106,7 @@ public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { (value, asyncBody) -> { responseText.append(value.stream().map(StandardCharsets.UTF_8::decode) .map(CharBuffer::toString).collect(Collectors.joining())); - asyncBody.done().complete(null); // OkHttp requires this, not sure if it should + asyncBody.consume(); }) .get(10L, TimeUnit.SECONDS); assertThat(responseText).isEmpty(); diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java index 958abd804c5..5ea4f843da0 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -106,16 +105,19 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons return CompletableFuture.completedFuture(true); } }); - final AtomicReference result = new AtomicReference<>(); + final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { final HttpResponse asyncR = client.consumeLines( - client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> result.set(s)) + client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> { + result.complete(s); + ab.consume(); + }) .get(10L, TimeUnit.SECONDS); asyncR.body().consume(); asyncR.body().done().get(10L, TimeUnit.SECONDS); // Then - assertThat(result).hasValue("This works"); + assertThat(result.get()).isEqualTo("This works"); } } @@ -132,17 +134,20 @@ public CompletableFuture afterFailure(BasicBuilder builder, HttpRespons return CompletableFuture.completedFuture(true); } }); - final AtomicReference result = new AtomicReference<>(); + final CompletableFuture result = new CompletableFuture<>(); // When try (HttpClient client = builder.build()) { final HttpResponse asyncR = client.consumeBytes( client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), - (s, ab) -> result.set(StandardCharsets.UTF_8.decode(s.iterator().next()).toString())) + (s, ab) -> { + result.complete(StandardCharsets.UTF_8.decode(s.iterator().next()).toString()); + ab.consume(); + }) .get(10L, TimeUnit.SECONDS); asyncR.body().consume(); asyncR.body().done().get(10L, TimeUnit.SECONDS); // Then - assertThat(result).hasValue("This works"); + assertThat(result.get()).isEqualTo("This works"); } }