From 3cb5494fb0c53631353ade9c7bfd73703456b33c Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Mon, 30 May 2022 19:02:30 +0200 Subject: [PATCH] feat: Jetty HttpClient implementation Signed-off-by: Marc Nuri --- .github/workflows/e2e-httpclient-tests.yml | 4 +- .github/workflows/release-snapshots.yaml | 2 +- httpclient-jdk/pom.xml | 24 +- .../jdkhttp/JdkHttpClientBuilderImpl.java | 5 +- .../client/jdkhttp/JdkHttpClientImpl.java | 14 +- .../client/jdkhttp/JdkHttpRequestImpl.java | 10 +- .../jdkhttp/JdkHttpClientAsyncBodyTest.java | 50 ++++ .../jdkhttp/JdkHttpClientInterceptorTest.java | 40 +++ .../client/jdkhttp/JdkHttpClientPostTest.java | 27 ++ httpclient-jetty/pom.xml | 191 ++++++++++++++ .../jetty/DerivedJettyHttpClientBuilder.java | 68 +++++ .../jetty/JettyAsyncResponseListener.java | 92 +++++++ .../client/jetty/JettyHttpClient.java | 191 ++++++++++++++ .../client/jetty/JettyHttpClientBuilder.java | 172 ++++++++++++ .../client/jetty/JettyHttpClientFactory.java | 36 +++ .../client/jetty/JettyHttpResponse.java | 126 +++++++++ .../client/jetty/JettyWebSocket.java | 155 +++++++++++ .../client/jetty/JettyWebSocketBuilder.java | 94 +++++++ ....kubernetes.client.http.HttpClient$Factory | 17 ++ .../client/jetty/JettyAsyncBodyTest.java | 27 ++ .../jetty/JettyHttpClientBuilderTest.java | 49 ++++ .../jetty/JettyHttpClientFactoryTest.java | 44 ++++ .../client/jetty/JettyHttpClientTest.java | 157 +++++++++++ .../client/jetty/JettyHttpPostTest.java | 27 ++ .../client/jetty/JettyHttpResponseTest.java | 77 ++++++ .../client/jetty/JettyInterceptorTest.java | 27 ++ .../jetty/JettyWebSocketBuilderTest.java | 109 ++++++++ .../client/jetty/JettyWebSocketSendTest.java | 74 ++++++ .../client/jetty/JettyWebSocketTest.java | 249 ++++++++++++++++++ httpclient-okhttp/pom.xml | 26 +- .../okhttp/OkHttpClientBuilderImpl.java | 4 +- .../client/okhttp/OkHttpClientImpl.java | 5 + .../client/okhttp/OkHttpRequestImpl.java | 6 + .../client/okhttp/OkHttpAsyncBodyTest.java | 27 ++ .../client/okhttp/OkHttpInterceptorTest.java | 40 +++ .../client/okhttp/OkHttpPostTest.java | 27 ++ httpclient-tests/pom.xml | 17 +- .../client/http/JettyHttpClientTest.java | 27 ++ .../client/http/OkHttpClientTest.java | 65 ++--- kubernetes-client-api/pom.xml | 6 +- .../client/http/AbstractBasicBuilder.java | 66 +++++ .../kubernetes/client/http/HttpClient.java | 47 +++- .../kubernetes/client/http/HttpHeaders.java | 10 +- .../client/http/StandardHttpHeaders.java | 44 ++++ .../client/http/StandardHttpRequest.java | 152 +++++++++++ .../client/http/StandardMediaTypes.java | 25 ++ .../kubernetes/client/http/WebSocket.java | 34 +++ .../client/http/AbstractAsyncBodyTest.java | 116 ++++++++ .../client/http/AbstractHttpPostTest.java | 107 ++++++++ .../client/http/AbstractInterceptorTest.java | 175 ++++++++++++ .../client/http/StandardHttpRequestTest.java | 107 ++++++++ .../client/http/TestHttpHeaders.java | 7 +- .../kubernetes/client/http/WebSocketTest.java | 49 ++++ .../fabric8/kubernetes/client/BaseClient.java | 12 +- kubernetes-itests/pom.xml | 28 ++ .../java/io/fabric8/kubernetes/PodIT.java | 48 ++-- pom.xml | 39 ++- 57 files changed, 3363 insertions(+), 111 deletions(-) create mode 100644 httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java create mode 100644 httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java create mode 100644 httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java create mode 100644 httpclient-jetty/pom.xml create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactory.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java create mode 100644 httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java create mode 100644 httpclient-jetty/src/main/resources/META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactoryTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpPostTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilderTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketSendTest.java create mode 100644 httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java create mode 100644 httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java create mode 100644 httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java create mode 100644 httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpPostTest.java create mode 100644 httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AbstractBasicBuilder.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpHeaders.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpRequest.java create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardMediaTypes.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpPostTest.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpRequestTest.java create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/WebSocketTest.java diff --git a/.github/workflows/e2e-httpclient-tests.yml b/.github/workflows/e2e-httpclient-tests.yml index 2ef9ad316a6..66b97ba508a 100644 --- a/.github/workflows/e2e-httpclient-tests.yml +++ b/.github/workflows/e2e-httpclient-tests.yml @@ -39,7 +39,7 @@ jobs: fail-fast: false matrix: kubernetes: [v1.24.0,v1.23.3, v1.12.10] - httpclient: [jdk] + httpclient: [jdk,jetty] steps: - name: Checkout uses: actions/checkout@v3 @@ -67,7 +67,7 @@ jobs: fail-fast: false matrix: openshift: [v3.11.0, v3.10.0] - httpclient: [jdk] + httpclient: [jdk,jetty] steps: - name: Checkout uses: actions/checkout@v3 diff --git a/.github/workflows/release-snapshots.yaml b/.github/workflows/release-snapshots.yaml index 3ad21e38d2f..603c5661c5e 100644 --- a/.github/workflows/release-snapshots.yaml +++ b/.github/workflows/release-snapshots.yaml @@ -84,4 +84,4 @@ jobs: gpg-private-key: ${{ secrets.SIGNINGKEY }} gpg-passphrase: SIGNINGPASSWORD - name: Build and release Java 11 modules - run: ./mvnw ${MAVEN_ARGS} ${RELEASE_MAVEN_ARGS} -pl "httpclient-jdk" clean deploy + run: ./mvnw ${MAVEN_ARGS} ${RELEASE_MAVEN_ARGS} -pl "httpclient-jdk" -pl "httpclient-jetty" clean deploy diff --git a/httpclient-jdk/pom.xml b/httpclient-jdk/pom.xml index 2c3087a2fc3..8230c7bba3e 100644 --- a/httpclient-jdk/pom.xml +++ b/httpclient-jdk/pom.xml @@ -26,7 +26,7 @@ kubernetes-httpclient-jdk jar - Fabric8 :: Kubernetes :: JDK HttpClient + Fabric8 :: Kubernetes :: HttpClient :: JDK 11 @@ -50,6 +50,28 @@ io.fabric8 kubernetes-client-api + + + io.fabric8 + kubernetes-client-api + test-jar + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + io.fabric8 + mockwebserver + test + + + org.assertj + assertj-core + test + diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java index dfbcae79ca8..088e659611e 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientBuilderImpl.java @@ -181,7 +181,7 @@ public Builder preferHttp11() { } @Override - public Builder tlsVersions(TlsVersion[] tlsVersions) { + public Builder tlsVersions(TlsVersion... tlsVersions) { this.tlsVersions = tlsVersions; return this; } @@ -192,7 +192,6 @@ public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) { copy.readTimeout = this.readTimeout; copy.sslContext = this.sslContext; copy.interceptors = new LinkedHashMap<>(this.interceptors); - copy.followRedirects = this.followRedirects; copy.proxyAddress = this.proxyAddress; copy.proxyAuthorization = this.proxyAuthorization; copy.tlsVersions = this.tlsVersions; @@ -202,4 +201,4 @@ public JdkHttpClientBuilderImpl copy(java.net.http.HttpClient httpClient) { return copy; } -} \ No newline at end of file +} diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java index d006b27689a..168f22df6ef 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java @@ -35,6 +35,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -132,6 +133,11 @@ public List headers(String key) { return response.headers().allValues(key); } + @Override + public Map> headers() { + return response.headers().map(); + } + @Override public int code() { return response.statusCode(); @@ -334,13 +340,7 @@ public CompletableFuture internalBuildAsync(JdkWebSocketImpl. // use a responseholder to convey both the exception and the websocket CompletableFuture response = new CompletableFuture<>(); - URI uri = request.uri(); - if (uri.getScheme().startsWith("http")) { - // the jdk logic expects a ws uri - // after the https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8245245 it just does the reverse of this - // to convert back to http(s) ... - uri = URI.create("ws" + uri.toString().substring(4)); - } + URI uri = WebSocket.toWebSocketUri(request.uri()); newBuilder.buildAsync(uri, new JdkWebSocketImpl.ListenerAdapter(listener, queueSize)).whenComplete((w, t) -> { if (t instanceof CompletionException && t.getCause() != null) { t = t.getCause(); diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpRequestImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpRequestImpl.java index 803f7632304..fc130c074a2 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpRequestImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpRequestImpl.java @@ -26,11 +26,12 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.concurrent.Flow.Subscriber; -class JdkHttpRequestImpl implements HttpRequest { +import static io.fabric8.kubernetes.client.http.StandardHttpHeaders.CONTENT_TYPE; - private static final String CONTENT_TYPE = "Content-Type"; +class JdkHttpRequestImpl implements HttpRequest { public static class BuilderImpl implements Builder { @@ -141,6 +142,11 @@ public List headers(String key) { return request.headers().allValues(key); } + @Override + public Map> headers() { + return request.headers().map(); + } + @Override public URI uri() { return request.uri(); diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java new file mode 100644 index 00000000000..b1935142275 --- /dev/null +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientAsyncBodyTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jdkhttp; + +import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest; +import io.fabric8.kubernetes.client.http.HttpClient; +import org.junit.jupiter.api.Disabled; + + +@SuppressWarnings("java:S2187") +public class JdkHttpClientAsyncBodyTest extends AbstractAsyncBodyTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JdkHttpClientFactory(); + } + + // TODO: Check tests validate expected behavior + @Disabled("TODO: Check tests validate expected behavior") + @Override + public void consumeLinesProcessedAfterConsume() throws Exception { + super.consumeLinesProcessedAfterConsume(); + } + + // TODO: Check tests validate expected behavior + @Disabled("TODO: Check tests validate expected behavior") + @Override + public void consumeLinesNotProcessedIfCancelled() throws Exception { + super.consumeLinesNotProcessedIfCancelled(); + } + + // TODO: Check tests validate expected behavior + @Disabled("TODO: Check tests validate expected behavior") + @Override + public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { + super.consumeByteBufferLinesProcessedAfterConsume(); + } +} diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java new file mode 100644 index 00000000000..5dfe39ee11b --- /dev/null +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientInterceptorTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jdkhttp; + +import io.fabric8.kubernetes.client.http.AbstractInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; +import org.junit.jupiter.api.Disabled; + +@SuppressWarnings("java:S2187") +public class JdkHttpClientInterceptorTest extends AbstractInterceptorTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JdkHttpClientFactory(); + } + + // TODO: Check implementation + @Disabled("TODO: Check implementation") + @Override + public void afterHttpFailureReplacesResponseInConsumeLines() { + } + + // TODO: Check implementation + @Disabled("TODO: Check implementation") + @Override + public void afterHttpFailureReplacesResponseInConsumeBytes() { + } +} diff --git a/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java new file mode 100644 index 00000000000..68a5e36148a --- /dev/null +++ b/httpclient-jdk/src/test/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientPostTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jdkhttp; + +import io.fabric8.kubernetes.client.http.AbstractHttpPostTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JdkHttpClientPostTest extends AbstractHttpPostTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JdkHttpClientFactory(); + } +} diff --git a/httpclient-jetty/pom.xml b/httpclient-jetty/pom.xml new file mode 100644 index 00000000000..8ac51488e1e --- /dev/null +++ b/httpclient-jetty/pom.xml @@ -0,0 +1,191 @@ + + + + 4.0.0 + + kubernetes-client-project + io.fabric8 + 6.0-SNAPSHOT + + + kubernetes-httpclient-jetty + jar + Fabric8 :: Kubernetes :: HttpClient :: Jetty + + + 11 + + osgi.extender; + filter:="(osgi.extender=osgi.serviceloader.registrar)", + + + !android.util*, + *, + + + io.fabric8.kubernetes.client.jetty*;-noimport:=true, + + + + + + + + io.fabric8 + kubernetes-client-api + + + org.eclipse.jetty + jetty-client + + + org.eclipse.jetty.http2 + http2-http-client-transport + + + org.eclipse.jetty.websocket + websocket-jetty-client + + + + io.fabric8 + kubernetes-client-api + test-jar + test + + + org.junit.jupiter + junit-jupiter-engine + + + org.junit.jupiter + junit-jupiter-params + + + io.fabric8 + mockwebserver + test + + + org.mockito + mockito-inline + + + org.assertj + assertj-core + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + + + + + + + + + + + + + + + + + + + TLSv1.2,TLSv1.1,TLSv1 + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + + + java + + + + + test + + + + org.jacoco + jacoco-maven-plugin + + + report-aggregate + verify + + report-aggregate + + + + + + org.apache.felix + maven-bundle-plugin + ${maven.bundle.plugin.version} + + + bundle + package + + bundle + + + + ${project.name} + ${project.groupId}.${project.artifactId} + ${osgi.export} + ${osgi.import} + ${osgi.dynamic.import} + ${osgi.require-capability} + ${osgi.provide-capability} + ${osgi.private} + ${osgi.bundles} + ${osgi.activator} + ${osgi.export.service} + + /META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory=target/classes/META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory, + + + bundle + + + + + + + + diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java new file mode 100644 index 00000000000..dcb0a8e3aeb --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/DerivedJettyHttpClientBuilder.java @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.Interceptor; + +import java.time.Duration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("unchecked") +public abstract class DerivedJettyHttpClientBuilder implements HttpClient.DerivedClientBuilder { + + final JettyHttpClientFactory factory; + Duration readTimeout = Duration.ZERO; + Duration writeTimeout = Duration.ZERO; + final Map interceptors; + + DerivedJettyHttpClientBuilder(JettyHttpClientFactory factory) { + this.factory = factory; + interceptors = new LinkedHashMap<>(); + } + + @Override + public final T readTimeout(long readTimeout, TimeUnit unit) { + this.readTimeout = Duration.ofNanos(unit.toNanos(readTimeout)); + return (T) this; + } + + @Override + public T writeTimeout(long writeTimeout, TimeUnit unit) { + this.writeTimeout = Duration.ofNanos(unit.toNanos(writeTimeout)); + return (T) this; + } + + @Override + public T forStreaming() { + // NO OP + return (T) this; + } + + @Override + public T authenticatorNone() { + // NO OP + return (T) this; + } + + @Override + public T addOrReplaceInterceptor(String name, Interceptor interceptor) { + interceptors.put(name, interceptor); + return (T) this; + } +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java new file mode 100644 index 00000000000..b71a02433f2 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.HttpRequest; +import io.fabric8.kubernetes.client.http.HttpResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; + +public abstract class JettyAsyncResponseListener extends Response.Listener.Adapter implements HttpClient.AsyncBody { + + private final HttpRequest httpRequest; + private final HttpClient.BodyConsumer bodyConsumer; + private final CompletableFuture> asyncResponse; + private final CompletableFuture asyncBodyDone; + private final CountDownLatch consumeLock; + + JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer bodyConsumer) { + this.httpRequest = httpRequest; + this.bodyConsumer = bodyConsumer; + asyncResponse = new CompletableFuture<>(); + asyncBodyDone = new CompletableFuture<>(); + consumeLock = new CountDownLatch(1); + } + + @Override + public void consume() { + consumeLock.countDown(); + } + + @Override + public CompletableFuture done() { + return asyncBodyDone; + } + + @Override + public void cancel() { + asyncBodyDone.cancel(false); + } + + @Override + public void onBegin(Response response) { + asyncResponse.complete(new JettyHttpResponse<>(httpRequest, response, this)); + } + + @Override + public void onComplete(Result result) { + asyncBodyDone.complete(null); + } + + public CompletableFuture> listen(Request request) { + request.send(this); + return asyncResponse; + } + + @Override + public void onContent(Response response, ByteBuffer content) { + try { + consumeLock.await(); + if (!asyncBodyDone.isCancelled()) { + bodyConsumer.consume(process(response, content), this); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + protected abstract T process(Response response, ByteBuffer content); +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java new file mode 100644 index 00000000000..df6f4262ca5 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClient.java @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.HttpRequest; +import io.fabric8.kubernetes.client.http.HttpResponse; +import io.fabric8.kubernetes.client.http.Interceptor; +import io.fabric8.kubernetes.client.http.StandardHttpRequest; +import io.fabric8.kubernetes.client.http.WebSocket; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.InputStreamRequestContent; +import org.eclipse.jetty.client.util.StringRequestContent; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static io.fabric8.kubernetes.client.http.StandardMediaTypes.APPLICATION_OCTET_STREAM; +import static io.fabric8.kubernetes.client.http.StandardMediaTypes.TEXT_PLAIN; +import static org.eclipse.jetty.util.BufferUtil.toArray; + +public class JettyHttpClient implements io.fabric8.kubernetes.client.http.HttpClient { + + private final HttpClient jetty; + private final WebSocketClient jettyWs; + private final Collection interceptors; + private final JettyHttpClientBuilder builder; + private final JettyHttpClientFactory factory; + + public JettyHttpClient(JettyHttpClientBuilder builder, HttpClient httpClient, WebSocketClient webSocketClient, + Collection interceptors, JettyHttpClientFactory jettyHttpClientFactory) { + this.builder = builder; + this.jetty = httpClient; + this.jettyWs = webSocketClient; + this.interceptors = interceptors; + this.factory = jettyHttpClientFactory; + } + + @Override + public void close() { + try { + jetty.stop(); + jettyWs.stop(); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + @Override + public DerivedClientBuilder newBuilder() { + return builder.copy(); + } + + @Override + public CompletableFuture> sendAsync(HttpRequest originalRequest, Class type) { + final var supportedResponse = JettyHttpResponse.SupportedResponse.from(type); + final var request = toStandardHttpRequest(originalRequest); + final CompletableFuture> future = new CompletableFuture<>(); + newRequest(request).send(new BufferingResponseListener() { + + // TODO: Long Term Refactor - This Listener blocks until the full response is read and keeps it in memory. + // Find a way to stream the response body without completing the future + // We need two signals, one when the response is received, and one when the body is completely + // read. + // Should this method be completely replaced by consumeXxx()? + @Override + public void onComplete(Result result) { + future.complete(new JettyHttpResponse<>( + request, result.getResponse(), supportedResponse.process(result.getResponse(), getContent(), type))); + } + }); + return interceptResponse(request.toBuilder(), future, r -> sendAsync(r, type)); + } + + @Override + public CompletableFuture> consumeLines( + HttpRequest originalRequest, BodyConsumer consumer) { + final var request = toStandardHttpRequest(originalRequest); + final var future = new JettyAsyncResponseListener<>(request, consumer) { + + @Override + protected String process(Response response, ByteBuffer content) { + return JettyHttpResponse.SupportedResponse.TEXT.process(response, toArray(content), String.class); + } + }.listen(newRequest(request)); + return interceptResponse(request.toBuilder(), future, r -> consumeLines(r, consumer)); + } + + @Override + public CompletableFuture> consumeBytes( + HttpRequest originalRequest, BodyConsumer> consumer) { + final var request = toStandardHttpRequest(originalRequest); + final var future = new JettyAsyncResponseListener<>(request, consumer) { + + @Override + protected List process(Response response, ByteBuffer content) { + return Collections.singletonList(content); + } + }.listen(newRequest(request)); + return interceptResponse(request.toBuilder(), future, r -> consumeBytes(r, consumer)); + } + + @Override + public WebSocket.Builder newWebSocketBuilder() { + return new JettyWebSocketBuilder(jettyWs, builder.readTimeout); + } + + @Override + public HttpRequest.Builder newHttpRequestBuilder() { + return new StandardHttpRequest.Builder(); + } + + @Override + public Factory getFactory() { + return factory; + } + + private Request newRequest(StandardHttpRequest originalRequest) { + try { + jetty.start(); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + final var requestBuilder = originalRequest.toBuilder(); + interceptors.forEach(i -> i.before(requestBuilder, originalRequest)); + final var request = requestBuilder.build(); + + var jettyRequest = jetty.newRequest(request.uri()).method(request.method()); + jettyRequest.timeout(builder.readTimeout.toMillis() + builder.writeTimeout.toMillis(), TimeUnit.MILLISECONDS); + jettyRequest.headers(m -> request.headers().forEach((k, l) -> l.forEach(v -> m.add(k, v)))); + + final var contentType = request.headers("Content-Type").stream().findAny(); + if (request.bodyString() != null) { + jettyRequest.body(new StringRequestContent(contentType.orElse(TEXT_PLAIN), request.bodyString())); + } else if (request.bodyStream() != null) { + jettyRequest.body(new InputStreamRequestContent(contentType.orElse(APPLICATION_OCTET_STREAM), request.bodyStream())); + } + return jettyRequest; + } + + private CompletableFuture> interceptResponse( + StandardHttpRequest.Builder builder, CompletableFuture> originalResponse, + Function>> function) { + for (var interceptor : interceptors) { + originalResponse = originalResponse.thenCompose(r -> { + if (!r.isSuccessful()) { + return interceptor.afterFailure(builder, r) + .thenCompose(b -> { + if (Boolean.TRUE.equals(b)) { + return function.apply(builder.build()); + } + return CompletableFuture.completedFuture(r); + }); + } + return CompletableFuture.completedFuture(r); + }); + } + return originalResponse; + } + + private static StandardHttpRequest toStandardHttpRequest(HttpRequest request) { + if (!(request instanceof StandardHttpRequest)) { + throw new IllegalArgumentException("Only StandardHttpRequest is supported"); + } + return (StandardHttpRequest) request; + } + +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java new file mode 100644 index 00000000000..29498218311 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.HttpClient.Builder; +import io.fabric8.kubernetes.client.http.TlsVersion; +import io.fabric8.kubernetes.client.internal.SSLUtils; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.HttpProxy; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; +import org.eclipse.jetty.client.http.HttpClientConnectionFactory; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; + +public class JettyHttpClientBuilder extends DerivedJettyHttpClientBuilder + implements Builder { + + private Duration connectTimeout; + private SSLContext sslContext; + private boolean followAllRedirects; + private Origin.Address proxyAddress; + private String proxyAuthorization; + private TlsVersion[] tlsVersions; + // TODO: HTTP2 disabled, MockWebServer support is limited and requires changes + // Enable (preferHttp11->false) the feature after fixing MockWebServer + private boolean preferHttp11 = true; + private HttpClient sharedHttpClient; + private WebSocketClient sharedWebSocketClient; + + public JettyHttpClientBuilder(JettyHttpClientFactory factory) { + super(factory); + } + + @Override + public JettyHttpClient build() { + if (sharedHttpClient != null) { + return new JettyHttpClient(this, sharedHttpClient, sharedWebSocketClient, interceptors.values(), factory); + } + final var sslContextFactory = new SslContextFactory.Client(); + if (sslContext != null) { + sslContextFactory.setSslContext(sslContext); + } + if (tlsVersions != null && tlsVersions.length > 0) { + sslContextFactory.setIncludeProtocols(Stream.of(tlsVersions).map(TlsVersion::javaName).toArray(String[]::new)); + } + sharedHttpClient = new HttpClient(newTransport(sslContextFactory, preferHttp11)); + sharedWebSocketClient = new WebSocketClient(new HttpClient(newTransport(sslContextFactory, preferHttp11))); + sharedWebSocketClient.setIdleTimeout(Duration.ZERO); + if (connectTimeout != null) { + sharedHttpClient.setConnectTimeout(connectTimeout.toMillis()); + sharedWebSocketClient.setConnectTimeout(connectTimeout.toMillis()); // TODO: CHECK + } + sharedHttpClient.setFollowRedirects(followAllRedirects); + if (proxyAddress != null) { + sharedHttpClient.getProxyConfiguration().getProxies().add(new HttpProxy(proxyAddress, false)); + } + if (proxyAddress != null && proxyAuthorization != null) { + sharedHttpClient.getRequestListeners().add(new Request.Listener.Adapter() { + @Override + public void onBegin(Request request) { + request.headers(h -> h.put("Proxy-Authorization", proxyAuthorization)); + } + }); + } + return new JettyHttpClient(this, sharedHttpClient, sharedWebSocketClient, interceptors.values(), factory); + } + + @Override + public Builder connectTimeout(long connectTimeout, TimeUnit unit) { + this.connectTimeout = Duration.ofNanos(unit.toNanos(connectTimeout)); + return this; + } + + @Override + public Builder sslContext(KeyManager[] keyManagers, TrustManager[] trustManagers) { + this.sslContext = SSLUtils.sslContext(keyManagers, trustManagers); + return this; + } + + @Override + public Builder followAllRedirects() { + followAllRedirects = true; + return this; + } + + @Override + public Builder proxyAddress(InetSocketAddress proxyAddress) { + if (proxyAddress == null) { + this.proxyAddress = null; + } else { + this.proxyAddress = new Origin.Address(proxyAddress.getHostString(), proxyAddress.getPort()); + } + return this; + } + + @Override + public Builder proxyAuthorization(String credentials) { + proxyAuthorization = credentials; + return this; + } + + @Override + public Builder tlsVersions(TlsVersion... tlsVersions) { + this.tlsVersions = tlsVersions; + return this; + } + + @Override + public Builder preferHttp11() { + preferHttp11 = true; + return this; + } + + public Builder copy() { + final var ret = new JettyHttpClientBuilder(factory); + ret.sharedHttpClient = sharedHttpClient; + ret.sharedWebSocketClient = sharedWebSocketClient; + ret.readTimeout = readTimeout; + ret.writeTimeout = writeTimeout; + ret.interceptors.putAll(interceptors); + ret.connectTimeout = connectTimeout; + ret.sslContext = sslContext; + ret.followAllRedirects = followAllRedirects; + ret.proxyAddress = proxyAddress; + ret.proxyAuthorization = proxyAuthorization; + ret.tlsVersions = tlsVersions; + ret.preferHttp11 = preferHttp11; + return ret; + } + + private static HttpClientTransport newTransport(SslContextFactory.Client sslContextFactory, boolean preferHttp11) { + final var clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(sslContextFactory); + final HttpClientTransport transport; + if (preferHttp11) { + transport = new HttpClientTransportOverHTTP(clientConnector); + } else { + var http2 = new ClientConnectionFactoryOverHTTP2.HTTP2(new HTTP2Client(clientConnector)); + transport = new HttpClientTransportDynamic(clientConnector, http2, HttpClientConnectionFactory.HTTP11); + } + return transport; + } +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactory.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactory.java new file mode 100644 index 00000000000..254c35b3610 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactory.java @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.http.HttpClient; + +import static io.fabric8.kubernetes.client.utils.HttpClientUtils.applyCommonConfiguration; + +public class JettyHttpClientFactory implements HttpClient.Factory { + + @Override + public HttpClient createHttpClient(Config config) { + final var builder = newBuilder(); + applyCommonConfiguration(config, builder, this); + return builder.build(); + } + + @Override + public HttpClient.Builder newBuilder() { + return new JettyHttpClientBuilder(this); + } +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java new file mode 100644 index 00000000000..77a66696778 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponse.java @@ -0,0 +1,126 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.HttpRequest; +import io.fabric8.kubernetes.client.http.HttpResponse; +import io.fabric8.kubernetes.client.utils.Utils; +import org.eclipse.jetty.client.api.Response; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; + +public class JettyHttpResponse implements HttpResponse { + + private final HttpRequest request; + private final Response response; + private final T body; + + public JettyHttpResponse(HttpRequest request, Response response, T body) { + this.request = request; + this.response = response; + this.body = body; + } + + @Override + public List headers(String key) { + return response.getHeaders().getValuesList(key); + } + + @Override + public Map> headers() { + return response.getHeaders().stream().reduce(new HashMap<>(), (m, e) -> { + m.compute(e.getName(), (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(e.getValue()); + return v; + }); + return m; + }, (m1, m2) -> m1); + } + + @Override + public int code() { + return response.getStatus(); + } + + @Override + public T body() { + return body; + } + + @Override + public HttpRequest request() { + return request; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + enum SupportedResponse { + + TEXT(String.class, (r, bytes) -> new String(bytes, responseCharset(r))), + INPUT_STREAM(ByteArrayInputStream.class, (r, bytes) -> new ByteArrayInputStream(bytes)), + READER(InputStreamReader.class, (r, bytes) -> new InputStreamReader(new ByteArrayInputStream(bytes), responseCharset(r))), + BYTE_ARRAY(byte[].class, (r, bytes) -> bytes); + + private final Class type; + private final BiFunction processor; + + SupportedResponse(Class type, BiFunction processor) { + this.type = type; + this.processor = processor; + } + + public T process(Response response, byte[] bytes, Class type) { + return type.cast(processor.apply(response, bytes)); + } + + static SupportedResponse from(Class type) { + for (SupportedResponse sr : SupportedResponse.values()) { + if (type.isAssignableFrom(sr.type)) { + return sr; + } + } + throw new IllegalArgumentException("Unsupported response type: " + type.getName()); + } + + private static Charset responseCharset(Response response) { + var responseCharset = StandardCharsets.UTF_8; + final var responseEncoding = response.getHeaders().get("Content-Encoding"); + if (Utils.isNotNullOrEmpty(responseEncoding)) { + try { + responseCharset = Charset.forName(responseEncoding); + } catch (Exception e) { + // ignored + } + } + return responseCharset; + } + } +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java new file mode 100644 index 00000000000..53ea3a55b45 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.WebSocket; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.WriteCallback; + +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class JettyWebSocket implements WebSocket, WebSocketListener { + private final WebSocket.Listener listener; + private final AtomicLong sendQueue; + private final Lock lock; + private final Condition backPressure; + private final AtomicBoolean closed; + private boolean moreMessages; + private Session webSocketSession; + + public JettyWebSocket(WebSocket.Listener listener) { + this.listener = listener; + sendQueue = new AtomicLong(); + lock = new ReentrantLock(); + backPressure = lock.newCondition(); + closed = new AtomicBoolean(); + moreMessages = true; + } + + @Override + public boolean send(ByteBuffer buffer) { + if (closed.get() || !webSocketSession.isOpen()) { + return false; + } + final int size = buffer.remaining(); + sendQueue.addAndGet(size); + webSocketSession.getRemote().sendBytes(buffer, new WriteCallback() { + @Override + public void writeFailed(Throwable x) { + sendQueue.addAndGet(-size); + } + + @Override + public void writeSuccess() { + sendQueue.addAndGet(-size); + } + }); + return true; + } + + @Override + public boolean sendClose(int code, String reason) { + if (webSocketSession.isOpen() && !closed.getAndSet(true)) { + webSocketSession.close(code, reason); + return true; + } + return false; + } + + @Override + public long queueSize() { + return sendQueue.get(); + } + + @Override + public void request() { + try { + lock.lock(); + moreMessages = true; + backPressure.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) { + backPressure(); + final var buffer = ByteBuffer.allocate(len); + buffer.put(payload, offset, len).rewind(); + listener.onMessage(this, buffer.asReadOnlyBuffer()); + } + + @Override + public void onWebSocketText(String message) { + backPressure(); + listener.onMessage(this, message); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + closed.set(true); + listener.onClose(this, statusCode, reason); + } + + @Override + public void onWebSocketConnect(Session session) { + listener.onOpen(this); + } + + @Override + public void onWebSocketError(Throwable cause) { + if (cause instanceof ClosedChannelException && closed.get()) { + // TODO: Check better + // It appears to be a race condition in Jetty: + // - The server sends a close frame (but we haven't received it) + // - Client enqueues a sendClose -> webSocketSession.close(code, reason) + // - Jetty/client receives the remote close -> onWebSocketClose + // - Jetty sends the enqueued close frame, but the socket was already closed in the previous step + // - Jetty throws a ClosedChannelException + return; + } + listener.onError(this, cause); + } + + public JettyWebSocket setWebSocketSession(Session webSocketSession) { + this.webSocketSession = webSocketSession; + return this; + } + + private void backPressure() { + try { + lock.lock(); + while (!moreMessages) { + backPressure.await(); + } + moreMessages = false; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } finally { + lock.unlock(); + } + } +} diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java new file mode 100644 index 00000000000..b3f9990b5e2 --- /dev/null +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.AbstractBasicBuilder; +import io.fabric8.kubernetes.client.http.StandardHttpRequest; +import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; +import io.fabric8.kubernetes.client.utils.Utils; +import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.websocket.api.exceptions.UpgradeException; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +import java.time.Duration; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; + +public class JettyWebSocketBuilder extends AbstractBasicBuilder implements WebSocket.Builder { + + private final WebSocketClient webSocketClient; + private final Duration handshakeTimeout; + private String subprotocol; + + public JettyWebSocketBuilder(WebSocketClient webSocketClient, Duration handshakeTimeout) { + this.webSocketClient = webSocketClient; + this.handshakeTimeout = handshakeTimeout; + } + + @Override + public CompletableFuture buildAsync(WebSocket.Listener listener) { + try { + webSocketClient.start(); + final ClientUpgradeRequest cur = new ClientUpgradeRequest(); + if (Utils.isNotNullOrEmpty(subprotocol)) { + cur.setSubProtocols(subprotocol); + } + cur.setHeaders(getHeaders()); + cur.setTimeout(handshakeTimeout.toMillis(), TimeUnit.MILLISECONDS); + // Extra-future required because we can't Map the UpgradeException to a WebSocketHandshakeException easily + final CompletableFuture future = new CompletableFuture<>(); + final var webSocket = new JettyWebSocket(listener); + return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur) + .thenApply(webSocket::setWebSocketSession) + .exceptionally(ex -> { + if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) { + future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause())); + } else if (ex instanceof UpgradeException) { + future.completeExceptionally(toHandshakeException((UpgradeException) ex)); + } else { + future.completeExceptionally(ex); + } + return null; + }) + .thenCompose(ws -> { + future.complete(ws); + return future; + }); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + @Override + public JettyWebSocketBuilder subprotocol(String protocol) { + this.subprotocol = protocol; + return this; + } + + private static WebSocketHandshakeException toHandshakeException(UpgradeException ex) { + return new WebSocketHandshakeException(new JettyHttpResponse<>( + new StandardHttpRequest.Builder().uri(ex.getRequestURI()).build(), + new HttpResponse(null, Collections.emptyList()).status(ex.getResponseStatusCode()), + null)) + .initCause(ex); + } +} diff --git a/httpclient-jetty/src/main/resources/META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory b/httpclient-jetty/src/main/resources/META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory new file mode 100644 index 00000000000..d750abff788 --- /dev/null +++ b/httpclient-jetty/src/main/resources/META-INF/services/io.fabric8.kubernetes.client.http.HttpClient$Factory @@ -0,0 +1,17 @@ +# +# Copyright (C) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +io.fabric8.kubernetes.client.jetty.JettyHttpClientFactory diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java new file mode 100644 index 00000000000..ca45f6bd9fa --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyAsyncBodyTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JettyAsyncBodyTest extends AbstractAsyncBodyTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JettyHttpClientFactory(); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java new file mode 100644 index 00000000000..6fd3e3d85c2 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilderTest.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.mockwebserver.DefaultMockServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class JettyHttpClientBuilderTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @Test + void http11() throws Exception { + try (var client = new JettyHttpClientBuilder(null).preferHttp11().build()) { + client.sendAsync(client.newHttpRequestBuilder().uri(server.url("/http-1-1")).build(), String.class).get(); + assertThat(server.getLastRequest()) + .isNotNull() + .hasFieldOrPropertyWithValue("requestLine", "GET /http-1-1 HTTP/1.1"); + } + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactoryTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactoryTest.java new file mode 100644 index 00000000000..dcb8403ab03 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientFactoryTest.java @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.Config; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class JettyHttpClientFactoryTest { + + @Test + @DisplayName("createHttpClient instantiates a JettyHttpClient") + void createHttpClientInstantiatesJettyHttpClient() { + // When + try (var result = new JettyHttpClientFactory().createHttpClient(Config.empty())) { + // Then + assertThat(result).isNotNull().isInstanceOf(JettyHttpClient.class); + } + } + + @Test + @DisplayName("newBuilder instantiates a JettyHttpClientBuilder") + void newBuilderInstantiatesJettyHttpClientBuilder() { + // When + final var result = new JettyHttpClientFactory().newBuilder(); + // Then + assertThat(result).isNotNull().isInstanceOf(JettyHttpClientBuilder.class); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java new file mode 100644 index 00000000000..2ce2e281b21 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientTest.java @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.TestHttpRequest; +import io.fabric8.kubernetes.client.http.TlsVersion; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class JettyHttpClientTest { + + private HttpClient httpClient; + private WebSocketClient webSocketClient; + + @BeforeEach + void setUp() { + httpClient = new HttpClient(); + webSocketClient = new WebSocketClient(); + } + + @AfterEach + void tearDown() throws Exception { + webSocketClient.stop(); + httpClient.stop(); + } + + @Test + @DisplayName("close, should close all underlying clients") + void closeShouldCloseClients() { + try (var jettyHttpClient = new JettyHttpClient( + null, httpClient, webSocketClient, Collections.emptyList(), null)) { + // When + jettyHttpClient.close(); + // Then + assertThat(httpClient.isStopped()).isTrue(); + assertThat(webSocketClient.isStopped()).isTrue(); + } + } + + @Test + @DisplayName("newBuilder instantiates a DerivedJettyHttpClientBuilder") + void newBuilderInstantiatesJettyHttpClientBuilderWithSameSettings() throws Exception { + // Given + final var originalBuilder = new JettyHttpClientBuilder(null); + originalBuilder + .connectTimeout(1337, TimeUnit.SECONDS) + .readTimeout(1337, TimeUnit.SECONDS) + .tlsVersions(TlsVersion.SSL_3_0) + .followAllRedirects(); + try (var firstClient = new JettyHttpClient( + originalBuilder, httpClient, webSocketClient, Collections.emptyList(), null)) { + // When + final var result = firstClient.newBuilder() + .readTimeout(313373, TimeUnit.SECONDS); + // Then + assertThat(result) + .isNotNull() + .isInstanceOf(DerivedJettyHttpClientBuilder.class) + .isNotSameAs(originalBuilder); + final var expected = Map.of( + "tlsVersions", new TlsVersion[]{TlsVersion.SSL_3_0}, + "followAllRedirects", true); + for (var entry : expected.entrySet()) { + final var field = JettyHttpClientBuilder.class.getDeclaredField(entry.getKey()); + field.setAccessible(true); + assertThat(field.get(result)) + .isEqualTo(field.get(originalBuilder)) + .isEqualTo(entry.getValue()); + field.setAccessible(false); + } + var readTimeout = DerivedJettyHttpClientBuilder.class.getDeclaredField("readTimeout"); + readTimeout.setAccessible(true); + assertThat(readTimeout.get(result)).isEqualTo(Duration.ofSeconds(313373)); + assertThat(readTimeout.get(originalBuilder)).isEqualTo(Duration.ofSeconds(1337)); + readTimeout.setAccessible(false); + } + } + + @Test + @DisplayName("sendAsync with unsupported type throws Exception") + void sendAsyncUnsupportedType() { + try (var jettyHttpClient = new JettyHttpClient( + null, httpClient, webSocketClient, Collections.emptyList(), null)) { + // When + final var result = assertThrows(IllegalArgumentException.class, + () -> jettyHttpClient.sendAsync(null, Integer.class)); + // Then + assertThat(result).hasMessage("Unsupported response type: java.lang.Integer"); + } + } + + @Test + @DisplayName("sendAsync with unsupported HttpRequest throws Exception") + void sendAsyncUnsupportedHttpRequest() { + try (var jettyHttpClient = new JettyHttpClient( + new JettyHttpClientBuilder(null), httpClient, webSocketClient, Collections.emptyList(), null)) { + // When + final var request = new TestHttpRequest(); + final var result = assertThrows(IllegalArgumentException.class, + () -> jettyHttpClient.sendAsync(request, String.class)); + // Then + assertThat(result).hasMessage("Only StandardHttpRequest is supported"); + } + } + + @Test + @DisplayName("newWebSocketBuilder instantiates a JettyWebSocketBuilder") + void newWebSocketBuilderInstantiatesJettyWebSocketBuilder() { + try (var jettyHttpClient = new JettyHttpClient( + new JettyHttpClientBuilder(null), httpClient, webSocketClient, Collections.emptyList(), null)) { + // When + final var result = jettyHttpClient.newWebSocketBuilder(); + // Then + assertThat(result).isNotNull().isInstanceOf(JettyWebSocketBuilder.class); + } + } + + @Test + @DisplayName("getFactory returns original factory") + void getFactoryReturnsOriginal() { + // Given + final var factory = new JettyHttpClientFactory(); + try (var jettyHttpClient = new JettyHttpClient( + null, httpClient, webSocketClient, Collections.emptyList(), factory)) { + // When + final var f1 = jettyHttpClient.getFactory(); + final var f2 = jettyHttpClient.getFactory(); + // Then + assertThat(f1).isSameAs(f2).isSameAs(factory); + } + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpPostTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpPostTest.java new file mode 100644 index 00000000000..b8eaa3adae8 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpPostTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.AbstractHttpPostTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JettyHttpPostTest extends AbstractHttpPostTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JettyHttpClientFactory(); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java new file mode 100644 index 00000000000..3bf62334580 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyHttpResponseTest.java @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import org.eclipse.jetty.client.HttpResponse; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class JettyHttpResponseTest { + + @Test + void headersHandlesJettyHttpFields() { + // Given + final HttpResponse response = new HttpResponse(null, Collections.emptyList()) + .headers(m -> m + .add("Content-Type", "text/plain") + .add("Content-Length", "1337") + .add("Via", "proxy-1") + .add("Via", "proxy-2")); + // When + final Map> result = new JettyHttpResponse<>(null, response, null) + .headers(); + // Then + assertThat(result) + .hasSize(3) + .containsEntry("Content-Type", Collections.singletonList("text/plain")) + .containsEntry("Content-Length", Collections.singletonList("1337")) + .containsEntry("Via", Arrays.asList("proxy-1", "proxy-2")); + } + + @ParameterizedTest(name = "{index}: SupportedResponse: from type ''{0}'' is ''{1}''") + @MethodSource("supportedResponsesInput") + void supportedResponses(Class type, JettyHttpResponse.SupportedResponse supportedResponse) { + // When + final var result = JettyHttpResponse.SupportedResponse.from(type); + // Then + assertThat(result).isEqualTo(supportedResponse); + } + + static Stream supportedResponsesInput() { + return Stream.of( + arguments(String.class, JettyHttpResponse.SupportedResponse.TEXT), + arguments(InputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), + arguments(ByteArrayInputStream.class, JettyHttpResponse.SupportedResponse.INPUT_STREAM), + arguments(Reader.class, JettyHttpResponse.SupportedResponse.READER), + arguments(InputStreamReader.class, JettyHttpResponse.SupportedResponse.READER), + arguments(byte[].class, JettyHttpResponse.SupportedResponse.BYTE_ARRAY)); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java new file mode 100644 index 00000000000..afbd6a2bc90 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyInterceptorTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.AbstractInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class JettyInterceptorTest extends AbstractInterceptorTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JettyHttpClientFactory(); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilderTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilderTest.java new file mode 100644 index 00000000000..eb84ef5c596 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilderTest.java @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; +import io.fabric8.mockwebserver.DefaultMockServer; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class JettyWebSocketBuilderTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @Test + void buildAsyncConnectsAndUpgrades() throws Exception { + server.expect().withPath("/websocket-test") + .andUpgradeToWebSocket() + .open() + .done() + .always(); + final var open = new AtomicBoolean(false); + new JettyWebSocketBuilder(new WebSocketClient(new HttpClient()), Duration.ZERO) + .uri(URI.create(server.url("/websocket-test"))) + .buildAsync(new WebSocket.Listener() { + @Override + public void onOpen(WebSocket webSocket) { + open.set(true); + } + }).get(10L, TimeUnit.SECONDS); + assertThat(open).isTrue(); + } + + @Test + void buildAsyncCantUpgradeThrowsWebSocketHandshakeException() { + final var result = assertThrows(ExecutionException.class, + () -> new JettyWebSocketBuilder(new WebSocketClient(new HttpClient()), Duration.ZERO) + .uri(URI.create(server.url("/not-found"))) + .buildAsync(new WebSocket.Listener() { + }) + .get(10L, TimeUnit.SECONDS)); + assertThat(result).cause().isInstanceOf(WebSocketHandshakeException.class); + } + + @Test + void buildAsyncIncludesRequiredHeadersAndPropagatesConfigured() throws Exception { + server.expect().withPath("/websocket-headers-test") + .andUpgradeToWebSocket() + .open() + .done() + .always(); + final var open = new AtomicBoolean(false); + new JettyWebSocketBuilder(new WebSocketClient(new HttpClient()), Duration.ZERO) + .header("A-Random-Header", "A-Random-Value") + .subprotocol("amqp") + .uri(URI.create(server.url("/websocket-headers-test"))) + .buildAsync(new WebSocket.Listener() { + @Override + public void onOpen(WebSocket webSocket) { + open.set(true); + } + }).get(10L, TimeUnit.SECONDS); + assertThat(open).isTrue(); + assertThat(server.getLastRequest().getHeaders().toMultimap()) + .containsEntry("a-random-header", Collections.singletonList("A-Random-Value")) + .containsEntry("sec-websocket-protocol", Collections.singletonList("amqp")) + .containsEntry("connection", Collections.singletonList("Upgrade")) + .containsEntry("upgrade", Collections.singletonList("websocket")) + .containsEntry("sec-websocket-version", Collections.singletonList("13")) + .containsKey("sec-websocket-key"); + } +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketSendTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketSendTest.java new file mode 100644 index 00000000000..664936fa2b6 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketSendTest.java @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.mockwebserver.DefaultMockServer; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +class JettyWebSocketSendTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @Test + void sendEmitsMessageToWebSocketServer() throws Exception { + server.expect().withPath("/send-text") + .andUpgradeToWebSocket() + .open() + .expect("GiveMeSomething") + .andEmit("received") + .always() + .done() + .always(); + final BlockingQueue receivedText = new ArrayBlockingQueue<>(1); + final var ws = new JettyWebSocketBuilder(new WebSocketClient(new HttpClient()), Duration.ZERO) + .uri(URI.create(String.format("ws://%s:%s/send-text", server.getHostName(), server.getPort()))) + .buildAsync(new WebSocket.Listener() { + public void onMessage(WebSocket webSocket, String text) { + receivedText.offer(text); + } + }).get(10L, TimeUnit.SECONDS); + ws.send(ByteBuffer.wrap("GiveMeSomething".getBytes(StandardCharsets.UTF_8))); + final var result = receivedText.poll(10L, TimeUnit.SECONDS); + assertThat(result).isEqualTo("received"); + } + +} diff --git a/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java new file mode 100644 index 00000000000..150755dd106 --- /dev/null +++ b/httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.jetty; + +import io.fabric8.kubernetes.client.http.WebSocket; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.Session; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.LinkedHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class JettyWebSocketTest { + + @Test + @DisplayName("Remote WebSocket binary message, notifies first onMessage with no back pressure") + void webSocketBinaryNotifiesOnMessage() { + // Given + final var listener = new Listener(); + // When + new JettyWebSocket(listener).onWebSocketBinary(new byte[] { 1, 3, 3, 7 }, 0, 4); + // Then + assertThat(listener.events) + .containsOnlyKeys("onMessage") + .extracting("onMessage", InstanceOfAssertFactories.type(Object[].class)) + .extracting(o -> o[0], InstanceOfAssertFactories.type(ByteBuffer.class)) + .extracting(BufferUtil::toArray, InstanceOfAssertFactories.type(byte[].class)) + .isEqualTo(new byte[] { 1, 3, 3, 7 }); + } + + @Test + @DisplayName("Remote WebSocket text message, notifies first onMessage with no back pressure") + void webSocketTextNotifiesOnMessage() { + // Given + final var listener = new Listener(); + // When + new JettyWebSocket(listener).onWebSocketText("the message"); + // Then + assertThat(listener.events) + .containsOnlyKeys("onMessage") + .extracting("onMessage", InstanceOfAssertFactories.type(Object[].class)) + .extracting(o -> o[0], InstanceOfAssertFactories.type(String.class)) + .isEqualTo("the message"); + } + + @Test + @DisplayName("Remote WebSocket close, notifies onClose") + void webSocketCloseNotifiesOnClose() { + // Given + final var listener = new Listener(); + // When + new JettyWebSocket(listener).onWebSocketClose(1337, "closed"); + // Then + assertThat(listener.events) + .containsOnly(entry("onClose", new Object[] { 1337, "closed" })); + } + + @Test + @DisplayName("Remote WebSocket connect, notifies onOpen") + void webSocketConnectNotifiesOnOpen() { + // Given + final var listener = new Listener(); + // When + new JettyWebSocket(listener).onWebSocketConnect(null); + // Then + assertThat(listener.events).containsOnlyKeys("onOpen"); + } + + @Test + @DisplayName("Remote WebSocket error, notifies onError") + void webSocketErrorNotifiesOnError() { + // Given + final var listener = new Listener(); + // When + new JettyWebSocket(listener).onWebSocketError(new Exception("WebSocket Error!")); + // Then + assertThat(listener.events) + .containsOnlyKeys("onError") + .extracting("onError", InstanceOfAssertFactories.type(Object[].class)) + .extracting(o -> o[0], InstanceOfAssertFactories.throwable(Exception.class)) + .hasMessage("WebSocket Error!"); + } + + @Test + @DisplayName("Remote WebSocket error, ignored if connection is already closed and is ClosedChannelException") + void webSocketErrorIgnoredWhenClosed() { + // Given + final var listener = new Listener(); + final var jws = new JettyWebSocket(listener); + jws.onWebSocketClose(1000, "closed"); + // When + jws.onWebSocketError(new ClosedChannelException()); + // Then + assertThat(listener.events) + .containsOnlyKeys("onClose"); + } + + @Test + @DisplayName("Remote WebSocket error, notifies onClose if connection is already closed and is NOT ClosedChannelException") + void webSocketErrorNotifiesOnErrorWhenClosedAndNotClosedChannelException() { + // Given + final var listener = new Listener(); + final var jws = new JettyWebSocket(listener); + jws.onWebSocketClose(1000, "closed"); + // When + jws.onWebSocketError(new Exception("NOT ClosedChannelException")); + // Then + assertThat(listener.events) + .containsOnlyKeys("onClose", "onError") + .extracting("onError", InstanceOfAssertFactories.type(Object[].class)) + .extracting(o -> o[0], InstanceOfAssertFactories.throwable(Exception.class)) + .hasMessage("NOT ClosedChannelException"); + } + + @Test + @DisplayName("backPressure, onWebSocketText processes first frame and waits for request() call") + void backPressure() throws Exception { + final var executor = Executors.newSingleThreadExecutor(); + try { + final var buffer = new StringBuffer(); + final var messages = new String[] { "Hell", "o ", "World!" }; + final BlockingQueue lock = new ArrayBlockingQueue<>(3); + final var ws = new JettyWebSocket(new WebSocket.Listener() { + @Override + public void onMessage(WebSocket webSocket, String text) { + buffer.append(text); + } + }); + executor.execute(() -> { + for (var m : messages) { + ws.onWebSocketText(m); + lock.offer(m); + } + }); + lock.poll(1, TimeUnit.SECONDS); + assertThat(buffer).hasToString("Hell"); + ws.request(); + lock.poll(1, TimeUnit.SECONDS); + assertThat(buffer).hasToString("Hello "); + ws.request(); + lock.poll(1, TimeUnit.SECONDS); + assertThat(buffer).hasToString("Hello World!"); + } finally { + executor.shutdownNow(); + } + } + + @Test + @DisplayName("sendClose, sends close message if connection is open") + void sendCloseWhenConnectionIsOpen() { + // Given + final var jws = new JettyWebSocket(new Listener()); + final var session = mock(Session.class); + jws.setWebSocketSession(session); + when(session.isOpen()).thenReturn(true); + // When + jws.sendClose(1000, "Closing"); + // Then + verify(session).close(1000, "Closing"); + } + + @Test + @DisplayName("sendClose, ignored if connection is closed") + void sendCloseIgnoredWhenConnectionIsClosed() { + // Given + final var jws = new JettyWebSocket(new Listener()); + final var session = mock(Session.class); + jws.setWebSocketSession(session); + when(session.isOpen()).thenReturn(false); + // When + jws.sendClose(1000, "Closing"); + // Then + verify(session, times(0)).close(anyInt(), anyString()); + } + + @Test + @DisplayName("sendClose, ignored if connection is already closed") + void sendCloseIgnoredWhenAlreadyClosed() { + // Given + final var jws = new JettyWebSocket(new Listener()); + final var session = mock(Session.class); + jws.setWebSocketSession(session); + when(session.isOpen()).thenReturn(true); + jws.sendClose(1000, "Closing"); + // When + jws.sendClose(1000, "Closing twice"); + // Then + verify(session, times(1)).close(anyInt(), anyString()); + verify(session).close(1000, "Closing"); + } + + private static final class Listener implements WebSocket.Listener { + private final LinkedHashMap events = new LinkedHashMap<>(); + + @Override + public void onOpen(WebSocket webSocket) { + events.put("onOpen", null); + } + + @Override + public void onMessage(WebSocket webSocket, String text) { + events.put("onMessage", new Object[] { text }); + } + + @Override + public void onMessage(WebSocket webSocket, ByteBuffer bytes) { + events.put("onMessage", new Object[] { bytes }); + } + + @Override + public void onClose(WebSocket webSocket, int code, String reason) { + events.put("onClose", new Object[] { code, reason }); + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + events.put("onError", new Object[] { error }); + } + } +} diff --git a/httpclient-okhttp/pom.xml b/httpclient-okhttp/pom.xml index 85033ca4456..7411b5ee010 100644 --- a/httpclient-okhttp/pom.xml +++ b/httpclient-okhttp/pom.xml @@ -26,8 +26,8 @@ kubernetes-httpclient-okhttp jar - Fabric8 :: Kubernetes :: OkHttp HttpClient - + Fabric8 :: Kubernetes :: HttpClient :: OkHttp + osgi.extender; @@ -58,6 +58,28 @@ logging-interceptor ${okhttp.version} + + + io.fabric8 + kubernetes-client-api + test-jar + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + io.fabric8 + mockwebserver + test + + + org.assertj + assertj-core + test + diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java index 6172734f4c4..36d37117f72 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientBuilderImpl.java @@ -200,7 +200,7 @@ public Builder proxyAuthorization(String credentials) { } @Override - public Builder tlsVersions(TlsVersion[] tlsVersions) { + public Builder tlsVersions(TlsVersion... tlsVersions) { ConnectionSpec spec = new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS) .tlsVersions(Arrays.asList(tlsVersions) .stream() @@ -217,4 +217,4 @@ public Builder preferHttp11() { return this; } -} \ No newline at end of file +} diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java index 357d1b8cf1b..eb43302bb4a 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java @@ -166,6 +166,11 @@ public List headers(String key) { return response.headers(key); } + @Override + public Map> headers() { + return response.headers().toMultimap(); + } + } private final okhttp3.OkHttpClient httpClient; diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpRequestImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpRequestImpl.java index 7c559740189..cb65a79c643 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpRequestImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpRequestImpl.java @@ -32,6 +32,7 @@ import java.net.URI; import java.net.URL; import java.util.List; +import java.util.Map; class OkHttpRequestImpl implements HttpRequest { @@ -156,6 +157,11 @@ public List headers(String key) { return request.headers(key); } + @Override + public Map> headers() { + return request.headers().toMultimap(); + } + @Override public String bodyString() { if (request.body() == null) { diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java new file mode 100644 index 00000000000..a867a9f1a99 --- /dev/null +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpAsyncBodyTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.okhttp; + +import io.fabric8.kubernetes.client.http.AbstractAsyncBodyTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class OkHttpAsyncBodyTest extends AbstractAsyncBodyTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new OkHttpClientFactory(); + } +} diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java new file mode 100644 index 00000000000..35155c6b403 --- /dev/null +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpInterceptorTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.okhttp; + +import io.fabric8.kubernetes.client.http.AbstractInterceptorTest; +import io.fabric8.kubernetes.client.http.HttpClient; +import org.junit.jupiter.api.Disabled; + +@SuppressWarnings("java:S2187") +public class OkHttpInterceptorTest extends AbstractInterceptorTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new OkHttpClientFactory(); + } + + // TODO: Check implementation + @Disabled("TODO: Check implementation") + @Override + public void afterHttpFailureReplacesResponseInConsumeLines() { + } + + // TODO: Check implementation + @Disabled("TODO: Check implementation") + @Override + public void afterHttpFailureReplacesResponseInConsumeBytes() { + } +} diff --git a/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpPostTest.java b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpPostTest.java new file mode 100644 index 00000000000..4f2d00009dc --- /dev/null +++ b/httpclient-okhttp/src/test/java/io/fabric8/kubernetes/client/okhttp/OkHttpPostTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.okhttp; + +import io.fabric8.kubernetes.client.http.AbstractHttpPostTest; +import io.fabric8.kubernetes.client.http.HttpClient; + +@SuppressWarnings("java:S2187") +public class OkHttpPostTest extends AbstractHttpPostTest { + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new OkHttpClientFactory(); + } +} diff --git a/httpclient-tests/pom.xml b/httpclient-tests/pom.xml index 2ec810ce9a6..2b8b56aded9 100644 --- a/httpclient-tests/pom.xml +++ b/httpclient-tests/pom.xml @@ -41,6 +41,10 @@ io.fabric8 kubernetes-httpclient-jdk + + io.fabric8 + kubernetes-httpclient-jetty + io.fabric8 kubernetes-client @@ -50,14 +54,21 @@ kubernetes-server-mock test + + org.junit.jupiter + junit-jupiter-engine + + + org.junit.jupiter + junit-jupiter-params + org.awaitility awaitility - org.junit.jupiter - junit-jupiter-engine - test + org.assertj + assertj-core diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java new file mode 100644 index 00000000000..a8c88e36f44 --- /dev/null +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/JettyHttpClientTest.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.jetty.JettyHttpClientFactory; + +public class JettyHttpClientTest extends OkHttpClientTest { + + @Override + protected HttpClient.Factory getHttpClientFactory() { + return new JettyHttpClientFactory(); + } + +} diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 463fb900c17..4ef2f18697c 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -23,24 +23,24 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; -import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.net.URI; -import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; /** * Tests a {@link HttpClient} at or below the {@link KubernetesClient} level. @@ -96,26 +96,20 @@ void testOnOpen() throws Exception { .open() .done().always(); - CompletableFuture onOpen = new CompletableFuture(); - - CompletableFuture startedFuture = client.getHttpClient().newWebSocketBuilder() + final CompletableFuture opened = new CompletableFuture<>(); + final CompletableFuture future = client.getHttpClient().newWebSocketBuilder() .uri(URI.create(client.getConfiguration().getMasterUrl() + "foo")) .buildAsync(new Listener() { @Override public void onOpen(WebSocket webSocket) { - onOpen.complete(null); + opened.complete(true); } - }); + }) + .thenCompose(ws -> opened); - // make sure onOpen has completed before this future - startedFuture.handle((w, t) -> { - assertTrue(onOpen.isDone()); - return null; - }); - - startedFuture.get(10, TimeUnit.SECONDS); + assertThat(future.get(10, TimeUnit.SECONDS)).isTrue(); } @Test @@ -159,14 +153,9 @@ void testAsyncBody() throws Exception { CompletableFuture> responseFuture = client.getHttpClient().consumeBytes( client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async")) .build(), - new HttpClient.BodyConsumer>() { - - @Override - public void consume(List value, AsyncBody asyncBody) throws Exception { - consumed.complete(true); - asyncBody.consume(); - } - + (value, asyncBody) -> { + consumed.complete(true); + asyncBody.consume(); }); responseFuture.whenComplete((r, t) -> { @@ -189,28 +178,18 @@ public void consume(List value, AsyncBody asyncBody) throws Exceptio assertTrue(consumed.get(5, TimeUnit.SECONDS)); } - @Test - void testSupportedTypes() throws Exception { + @DisplayName("Supported response body types") + @ParameterizedTest(name = "{index}: {0}") + @ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class }) + void testSupportedTypes(Class type) throws Exception { server.expect().withPath("/type").andReturn(200, "hello world").always(); - - testType(String.class); - testType(byte[].class); - testType(Reader.class); - testType(InputStream.class); - } - - private void testType(Class type) throws Exception { - client.getHttpClient() + final HttpResponse result = client.getHttpClient() .sendAsync(client.getHttpClient().newHttpRequestBuilder() .uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type) - .whenComplete((r, t) -> { - assertTrue(type.isAssignableFrom(r.body().getClass())); - try { - assertEquals("hello world", r.bodyString()); - } catch (IOException e) { - fail(e); - } - }).get(); + .get(10, TimeUnit.SECONDS); + assertThat(result) + .satisfies(r -> assertThat(r.body()).isInstanceOf(type)) + .satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world")); } } diff --git a/kubernetes-client-api/pom.xml b/kubernetes-client-api/pom.xml index 50de747b8a3..0bcc3c4c120 100644 --- a/kubernetes-client-api/pom.xml +++ b/kubernetes-client-api/pom.xml @@ -196,7 +196,6 @@ org.slf4j slf4j-simple - ${slf4j.version} test @@ -209,6 +208,11 @@ assertj-core test + + io.fabric8 + mockwebserver + test + org.awaitility awaitility diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AbstractBasicBuilder.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AbstractBasicBuilder.java new file mode 100644 index 00000000000..687f5a50532 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/AbstractBasicBuilder.java @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class AbstractBasicBuilder implements BasicBuilder { + + private URI uri; + private final Map> headers = new HashMap<>(); + + @Override + public T uri(URI uri) { + this.uri = uri; + return (T) this; + } + + @Override + public T header(String name, String value) { + headers.compute(name, (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(value); + return v; + }); + return (T) this; + } + + @Override + public T setHeader(String name, String value) { + headers.put(name, new ArrayList<>(Collections.singletonList(value))); + return (T) this; + } + + protected final URI getUri() { + return uri; + } + + protected final Map> getHeaders() { + return headers; + } + + protected final void setHeaders(Map> headers) { + this.headers.clear(); + this.headers.putAll(headers); + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java index 4802c291454..88bc7b9177e 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClient.java @@ -54,34 +54,65 @@ interface DerivedClientBuilder { DerivedClientBuilder readTimeout(long readTimeout, TimeUnit unit); - DerivedClientBuilder forStreaming(); - - DerivedClientBuilder authenticatorNone(); + DerivedClientBuilder writeTimeout(long writeTimeout, TimeUnit unit); - DerivedClientBuilder writeTimeout(long timeout, TimeUnit timeoutUnit); + /** + * Sets the HttpClient to be used to perform HTTP requests whose responses + * will be streamed. + * + * @return this Builder instance. + */ + DerivedClientBuilder forStreaming(); DerivedClientBuilder addOrReplaceInterceptor(String name, Interceptor interceptor); + + /** + * Prevents any built-in authenticator to respond to challenges from origin server. + *

+ * OkHttp specific option. + * + * @return this Builder instance. + */ + DerivedClientBuilder authenticatorNone(); } interface Builder extends DerivedClientBuilder { + /** + * {@inheritDoc} + */ @Override HttpClient build(); + /** + * {@inheritDoc} + */ @Override Builder readTimeout(long readTimeout, TimeUnit unit); + /** + * {@inheritDoc} + */ + @Override + Builder writeTimeout(long writeTimeout, TimeUnit unit); + Builder connectTimeout(long connectTimeout, TimeUnit unit); + /** + * {@inheritDoc} + */ @Override Builder forStreaming(); - @Override - Builder writeTimeout(long timeout, TimeUnit timeoutUnit); - + /** + * {@inheritDoc} + */ @Override Builder addOrReplaceInterceptor(String name, Interceptor interceptor); + /** + * {@inheritDoc} + */ @Override Builder authenticatorNone(); @@ -93,7 +124,7 @@ interface Builder extends DerivedClientBuilder { Builder proxyAuthorization(String credentials); - Builder tlsVersions(TlsVersion[] tlsVersions); + Builder tlsVersions(TlsVersion... tlsVersions); Builder preferHttp11(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpHeaders.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpHeaders.java index 727c6827b4c..1a89e6a84a5 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpHeaders.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpHeaders.java @@ -17,15 +17,23 @@ package io.fabric8.kubernetes.client.http; import java.util.List; +import java.util.Map; public interface HttpHeaders { /** * Returns a List of all the Header String values for the provided key/name. - * + * * @param key The header key/name for which to provide the values. * @return the List of header values for the provided key. */ List headers(String key); + /** + * Returns a Map containing a list of String values for each Header. + * + * @return the Map of Headers and their list of values. + */ + Map> headers(); + } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpHeaders.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpHeaders.java new file mode 100644 index 00000000000..67ae5272265 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpHeaders.java @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class StandardHttpHeaders implements HttpHeaders { + + public static final String CONTENT_TYPE = "Content-Type"; + public static final String CONTENT_LENGTH = "Content-Length"; + public static final String EXPECT = "Expect"; + public static final String EXPECT_CONTINUE = "100-Continue"; + + private final Map> headers; + + public StandardHttpHeaders(Map> headers) { + this.headers = headers; + } + + @Override + public List headers(String key) { + return Collections.unmodifiableList(headers.getOrDefault(key, Collections.emptyList())); + } + + @Override + public Map> headers() { + return Collections.unmodifiableMap(headers); + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpRequest.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpRequest.java new file mode 100644 index 00000000000..0893f8d7b54 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardHttpRequest.java @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.KubernetesClientException; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class StandardHttpRequest extends StandardHttpHeaders implements HttpRequest { + + public static final String METHOD_POST = "POST"; + + private final URI uri; + private final String method; + private final String bodyString; + private final InputStream bodyStream; + + public StandardHttpRequest(Map> headers, URI uri, String method, String bodyString, + InputStream bodyStream) { + super(headers); + this.uri = uri; + this.method = method; + this.bodyString = bodyString; + this.bodyStream = bodyStream; + } + + @Override + public URI uri() { + return uri; + } + + @Override + public String method() { + return method; + } + + /** + * {@inheritDoc} + */ + @Override + public String bodyString() { + return bodyString; + } + + /** + * Return the body as a string, but only if one of the byte[] or InputStream valued {@link HttpRequest.Builder} + * methods were used otherwise null. + * + * @return the body as InputStream. + */ + public InputStream bodyStream() { + return bodyStream; + } + + public Builder toBuilder() { + return new Builder(this); + } + + public static final class Builder extends AbstractBasicBuilder implements HttpRequest.Builder { + + private String method = "GET"; + private InputStream bodyAsStream; + private String bodyAsString; + + public Builder() { + } + + public Builder(StandardHttpRequest original) { + super.uri(original.uri()); + super.setHeaders(original.headers()); + method = original.method; + bodyAsString = original.bodyString; + bodyAsStream = original.bodyStream; + } + + @Override + public StandardHttpRequest build() { + return new StandardHttpRequest( + getHeaders(), Objects.requireNonNull(getUri()), method, bodyAsString, bodyAsStream); + } + + @Override + public HttpRequest.Builder uri(String uri) { + return super.uri(URI.create(uri)); + } + + @Override + public HttpRequest.Builder url(URL url) { + try { + return super.uri(url.toURI()); + } catch (URISyntaxException e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + @Override + public HttpRequest.Builder post(String contentType, byte[] writeValueAsBytes) { + return post(contentType, new ByteArrayInputStream(writeValueAsBytes), writeValueAsBytes.length); + } + + @Override + public HttpRequest.Builder post(String contentType, InputStream stream, long length) { + if (length >= 0) { + header(CONTENT_LENGTH, Long.toString(length)); + } + method = METHOD_POST; + contentType(contentType); + bodyAsStream = stream; + return this; + } + + @Override + public HttpRequest.Builder method(String method, String contentType, String body) { + this.method = method; + contentType(contentType); + this.bodyAsString = body; + return this; + } + + private void contentType(String contentType) { + if (contentType != null) { + setHeader(CONTENT_TYPE, contentType); + } + } + + @Override + public HttpRequest.Builder expectContinue() { + setHeader(EXPECT, EXPECT_CONTINUE); + return this; + } + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardMediaTypes.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardMediaTypes.java new file mode 100644 index 00000000000..5efdd75463f --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/StandardMediaTypes.java @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +public class StandardMediaTypes { + + public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; + public static final String TEXT_PLAIN = "text/plain"; + + private StandardMediaTypes() { + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index a8ceaf5f5a1..9d2c32ec59a 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -74,6 +74,21 @@ interface Builder extends BasicBuilder { */ CompletableFuture buildAsync(Listener listener); + /** + * Protocol used for WebSocket message exchange. + * + *

+ * The client can request that the server use a specific subprotocol by + * including the |Sec-WebSocket-Protocol| field in its handshake. If it + * is specified, the server needs to include the same field and one of + * the selected subprotocol values in its response for the connection to + * be established. + *
+ * RFC 6455: Section 1.9, Subprotocols Using the WebSocket Protocol + * + * @param protocol the protocol to be used. + * @return this builder. + */ Builder subprotocol(String protocol); @Override @@ -122,4 +137,23 @@ interface Builder extends BasicBuilder { */ void request(); + /** + * Converts http or https URIs to ws or wss URIs. + *

+ * Clients such as JDK or Jetty require URIs to be performed to the ws protocol, other clients perform + * this same transformation automatically. + * + * @param httpUri the original URI to transform. + * @return a new URI with the converted protocol (if applicable). + */ + static URI toWebSocketUri(URI httpUri) { + if (httpUri != null && httpUri.getScheme().startsWith("http")) { + // the jdk logic expects a ws uri + // after the https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8245245 it just does the reverse of this + // to convert back to http(s) ... + return URI.create("ws" + httpUri.toString().substring(4)); + } + return httpUri; + } + } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java new file mode 100644 index 00000000000..f15715f5221 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractAsyncBodyTest.java @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.mockwebserver.DefaultMockServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public abstract class AbstractAsyncBodyTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + protected abstract HttpClient.Factory getHttpClientFactory(); + + @Test + @DisplayName("Lines are processed and consumed only after the consume() invocation") + public void consumeLinesProcessedAfterConsume() throws Exception { + try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { + server.expect().withPath("/consume-lines") + .andReturn(200, "This is the response body\n") + .always(); + final StringBuffer responseText = new StringBuffer(); + final HttpResponse asyncBodyResponse = client.consumeLines( + client.newHttpRequestBuilder().uri(server.url("/consume-lines")).build(), + (value, asyncBody) -> { + responseText.append(value); + asyncBody.done().complete(null); // OkHttp requires this, not sure if it should + }) + .get(10L, TimeUnit.SECONDS); + assertThat(responseText).isEmpty(); + asyncBodyResponse.body().consume(); + asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); + assertThat(responseText).contains("This is the response body"); + } + } + + @Test + @DisplayName("Lines are not processed when cancel() invocation") + public void consumeLinesNotProcessedIfCancelled() throws Exception { + try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { + server.expect().withPath("/cancel") + .andReturn(200, "This would be the response body") + .always(); + final StringBuffer responseText = new StringBuffer(); + final HttpResponse asyncBodyResponse = client + .consumeLines(client.newHttpRequestBuilder() + .uri(server.url("/cancel")).build(), (value, asyncBody) -> responseText.append(value)) + .get(10L, TimeUnit.SECONDS); + asyncBodyResponse.body().cancel(); + asyncBodyResponse.body().consume(); + final CompletableFuture doneFuture = asyncBodyResponse.body().done(); + assertThrows(CancellationException.class, () -> doneFuture.get(10L, TimeUnit.SECONDS)); + assertThat(responseText).isEmpty(); + } + } + + @Test + @DisplayName("Bytes are processed and consumed only after the consume() invocation") + public void consumeByteBufferLinesProcessedAfterConsume() throws Exception { + try (final HttpClient client = getHttpClientFactory().newBuilder().build()) { + server.expect().withPath("/consume-bytes") + .andReturn(200, "This is the response body as bytes") + .always(); + final StringBuffer responseText = new StringBuffer(); + final HttpResponse asyncBodyResponse = client.consumeBytes( + client.newHttpRequestBuilder().uri(server.url("/consume-bytes")).build(), + (value, asyncBody) -> { + responseText.append(value.stream().map(StandardCharsets.UTF_8::decode) + .map(CharBuffer::toString).collect(Collectors.joining())); + asyncBody.done().complete(null); // OkHttp requires this, not sure if it should + }) + .get(10L, TimeUnit.SECONDS); + assertThat(responseText).isEmpty(); + asyncBodyResponse.body().consume(); + asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS); + assertThat(responseText).contains("This is the response body as bytes"); + } + } + +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpPostTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpPostTest.java new file mode 100644 index 00000000000..36b89538d64 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractHttpPostTest.java @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.mockwebserver.DefaultMockServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractHttpPostTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + protected abstract HttpClient.Factory getHttpClientFactory(); + + @Test + @DisplayName("String body, should receive a POST request with body") + public void postStringBody() throws Exception { + // When + try (HttpClient client = getHttpClientFactory().newBuilder().build()) { + client + .sendAsync(client.newHttpRequestBuilder() + .uri(server.url("/post-string")) + .post("text/plain", "A string body") + .build(), String.class) + .get(10L, TimeUnit.SECONDS); + } + // Then + assertThat(server.getLastRequest()) + .returns("POST", RecordedRequest::getMethod) + .returns("A string body", rr -> rr.getBody().readUtf8()) + .extracting(rr -> rr.getHeader("Content-Type")).asString() + .startsWith("text/plain"); + } + + @Test + @DisplayName("InputStream body, should receive a POST request with body") + public void postInputStreamBody() throws Exception { + // When + try (HttpClient client = getHttpClientFactory().newBuilder().build()) { + client + .sendAsync(client.newHttpRequestBuilder() + .uri(server.url("/post-input-stream")) + .post("text/plain", new ByteArrayInputStream("A string body".getBytes(StandardCharsets.UTF_8)), -1) + .build(), String.class) + .get(10L, TimeUnit.SECONDS); + } + // Then + assertThat(server.getLastRequest()) + .returns("POST", RecordedRequest::getMethod) + .returns("A string body", rr -> rr.getBody().readUtf8()) + .extracting(rr -> rr.getHeader("Content-Type")).asString() + .startsWith("text/plain"); + } + + @Test + @DisplayName("byte[] body, should receive a POST request with body") + public void postBytesBody() throws Exception { + // When + try (HttpClient client = getHttpClientFactory().newBuilder().build()) { + client + .sendAsync(client.newHttpRequestBuilder() + .uri(server.url("/post-bytes")) + .post("text/plain", "A string body".getBytes(StandardCharsets.UTF_8)) + .build(), String.class) + .get(10L, TimeUnit.SECONDS); + } + // Then + assertThat(server.getLastRequest()) + .returns("POST", RecordedRequest::getMethod) + .returns("A string body", rr -> rr.getBody().readUtf8()) + .extracting(rr -> rr.getHeader("Content-Type")).asString() + .startsWith("text/plain"); + } +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java new file mode 100644 index 00000000000..958abd804c5 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/AbstractInterceptorTest.java @@ -0,0 +1,175 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.mockwebserver.DefaultMockServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractInterceptorTest { + + private static DefaultMockServer server; + + @BeforeAll + static void beforeAll() { + server = new DefaultMockServer(false); + server.start(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + protected abstract HttpClient.Factory getHttpClientFactory(); + + @Test + @DisplayName("before, should add a header to the HTTP request") + public void beforeAddsHeaderToRequest() throws Exception { + // Given + final HttpClient.Builder builder = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("test", new Interceptor() { + @Override + public void before(BasicBuilder builder, HttpHeaders headers) { + builder.header("Test-Header", "Test-Value"); + } + }); + // When + try (HttpClient client = builder.build()) { + client.sendAsync(client.newHttpRequestBuilder().uri(server.url("/intercept-before")).build(), String.class) + .get(10L, TimeUnit.SECONDS); + } + // Then + assertThat(server.getLastRequest().getHeaders().toMultimap()) + .containsEntry("test-header", Collections.singletonList("Test-Value")); + } + + @Test + @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.sendAsync") + public void afterHttpFailureReplacesResponseInSendAsync() throws Exception { + // Given + server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + final HttpClient.Builder builder = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("test", new Interceptor() { + @Override + public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { + builder.uri(URI.create(server.url("/intercepted-url"))); + return CompletableFuture.completedFuture(true); + } + }); + // When + try (HttpClient client = builder.build()) { + final HttpResponse result = client + .sendAsync(client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), String.class) + .get(10L, TimeUnit.SECONDS); + // Then + assertThat(result) + .returns("This works", HttpResponse::body) + .returns(200, HttpResponse::code); + } + } + + @Test + @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeLines") + public void afterHttpFailureReplacesResponseInConsumeLines() throws Exception { + // Given + server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + final HttpClient.Builder builder = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("test", new Interceptor() { + @Override + public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { + builder.uri(URI.create(server.url("/intercepted-url"))); + return CompletableFuture.completedFuture(true); + } + }); + final AtomicReference result = new AtomicReference<>(); + // When + try (HttpClient client = builder.build()) { + final HttpResponse asyncR = client.consumeLines( + client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), (s, ab) -> result.set(s)) + .get(10L, TimeUnit.SECONDS); + asyncR.body().consume(); + asyncR.body().done().get(10L, TimeUnit.SECONDS); + // Then + assertThat(result).hasValue("This works"); + } + } + + @Test + @DisplayName("afterFailure (HTTP), replaces the HttpResponse produced by HttpClient.consumeBytes") + public void afterHttpFailureReplacesResponseInConsumeBytes() throws Exception { + // Given + server.expect().withPath("/intercepted-url").andReturn(200, "This works").always(); + final HttpClient.Builder builder = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("test", new Interceptor() { + @Override + public CompletableFuture afterFailure(BasicBuilder builder, HttpResponse response) { + builder.uri(URI.create(server.url("/intercepted-url"))); + return CompletableFuture.completedFuture(true); + } + }); + final AtomicReference result = new AtomicReference<>(); + // When + try (HttpClient client = builder.build()) { + final HttpResponse asyncR = client.consumeBytes( + client.newHttpRequestBuilder().uri(server.url("/not-found")).build(), + (s, ab) -> result.set(StandardCharsets.UTF_8.decode(s.iterator().next()).toString())) + .get(10L, TimeUnit.SECONDS); + asyncR.body().consume(); + asyncR.body().done().get(10L, TimeUnit.SECONDS); + // Then + assertThat(result).hasValue("This works"); + } + } + + @Test + @DisplayName("interceptors should be applied in the order they were added") + public void interceptorsAreAppliedInOrder() throws Exception { + // Given + final HttpClient.Builder builder = getHttpClientFactory().newBuilder() + .addOrReplaceInterceptor("first", new Interceptor() { + @Override + public void before(BasicBuilder builder, HttpHeaders headers) { + builder.header("Test-Header", "Test-Value"); + } + }) + .addOrReplaceInterceptor("second", new Interceptor() { + @Override + public void before(BasicBuilder builder, HttpHeaders headers) { + builder.setHeader("Test-Header", "Test-Value-Override"); + } + }); + // When + try (HttpClient client = builder.build()) { + client.sendAsync(client.newHttpRequestBuilder().uri(server.url("/intercept-before")).build(), String.class) + .get(10L, TimeUnit.SECONDS); + } + // Then + assertThat(server.getLastRequest().getHeaders().toMultimap()) + .containsEntry("test-header", Collections.singletonList("Test-Value-Override")); + } +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpRequestTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpRequestTest.java new file mode 100644 index 00000000000..1a9beb72cfb --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/StandardHttpRequestTest.java @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import io.fabric8.kubernetes.client.utils.IOHelpers; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +class StandardHttpRequestTest { + + @Test + void builderMethod() { + // Given + final HttpRequest.Builder b = new StandardHttpRequest.Builder() + .header("h1", "v1") + .header("h1", "v2") + .uri("https://example.com/aitana") + .expectContinue() + .method("PUT", "application/json", "{\"v\":true}"); + // When + final HttpRequest result = b.build(); + // Then + assertThat(result) + .isInstanceOf(StandardHttpRequest.class) + .returns(URI.create("https://example.com/aitana"), HttpRequest::uri) + .returns("PUT", HttpRequest::method) + .returns("{\"v\":true}", HttpRequest::bodyString) + .extracting(HttpHeaders::headers, InstanceOfAssertFactories.map(String.class, List.class)) + .containsOnly( + entry("h1", Arrays.asList("v1", "v2")), + entry("Content-Type", Collections.singletonList("application/json")), + entry("Expect", Collections.singletonList("100-Continue"))); + } + + @Test + void builderPostBytes() { + // Given + final HttpRequest.Builder b = new StandardHttpRequest.Builder() + .uri("https://example.com/alex") + .post("text/plain", "The bytes".getBytes(StandardCharsets.UTF_8)); + // When + final HttpRequest result = b.build(); + // Then + assertThat(result) + .isInstanceOf(StandardHttpRequest.class) + .asInstanceOf(InstanceOfAssertFactories.type(StandardHttpRequest.class)) + .returns(URI.create("https://example.com/alex"), HttpRequest::uri) + .returns("POST", HttpRequest::method) + .returns("The bytes", r -> toString(r.bodyStream())) + .extracting(HttpHeaders::headers, InstanceOfAssertFactories.map(String.class, List.class)) + .containsOnly( + entry("Content-Type", Collections.singletonList("text/plain")), + entry("Content-Length", Collections.singletonList("9"))); + } + + @Test + void toBuilderReturnsNewBuilderWithPreservedSettings() { + // Given + final HttpRequest.Builder b = new StandardHttpRequest.Builder() + .header("h1", "v1") + .uri("https://example.com/julia"); + // When + final HttpRequest.Builder result = ((StandardHttpRequest) b.build()).toBuilder(); + // Then + assertThat(result) + .isNotSameAs(b) + .extracting(HttpRequest.Builder::build) + .isInstanceOf(StandardHttpRequest.class) + .returns(URI.create("https://example.com/julia"), HttpRequest::uri) + .returns("GET", HttpRequest::method) + .extracting(HttpHeaders::headers, InstanceOfAssertFactories.map(String.class, List.class)) + .containsOnly( + entry("h1", Collections.singletonList("v1"))); + } + + static String toString(InputStream is) { + try { + return IOHelpers.readFully(is); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestHttpHeaders.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestHttpHeaders.java index 17573871dd3..48101b1622a 100644 --- a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestHttpHeaders.java +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/TestHttpHeaders.java @@ -23,7 +23,7 @@ /** * Basic {@link HttpHeaders} implementation to be used in tests instead of mocks or real headers. - * + * * @param type for the return type of chained methods. */ public class TestHttpHeaders implements HttpHeaders { @@ -35,6 +35,11 @@ public List headers(String key) { return headers.getOrDefault(key, Collections.emptyList()); } + @Override + public Map> headers() { + return headers; + } + public T clearHeaders() { headers.clear(); return (T) this; diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/WebSocketTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/WebSocketTest.java new file mode 100644 index 00000000000..6fe10b76233 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/http/WebSocketTest.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.http; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.util.stream.Stream; + +import static io.fabric8.kubernetes.client.http.WebSocket.toWebSocketUri; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class WebSocketTest { + + @ParameterizedTest(name = "{index}: toWebSocketUri: from ''{0}'' changes protocol to ''{1}''") + @MethodSource("toWebSocketUriInput") + void toWebSocketUriFromHttp(String uri, String expectedScheme) { + // When + final URI result = toWebSocketUri(URI.create(uri)); + // Then + assertThat(result).hasScheme(expectedScheme); + } + + static Stream toWebSocketUriInput() { + return Stream.of( + arguments("http://example.com", "ws"), + arguments("https://example.com", "wss"), + arguments("wss://example.com", "wss"), + arguments("ws://example.com", "ws")); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/BaseClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/BaseClient.java index 4b311bde3ca..a98364ba686 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/BaseClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/BaseClient.java @@ -189,10 +189,14 @@ public boolean supports(Class type) { if (Utils.isNullOrEmpty(typeApiVersion) || Utils.isNullOrEmpty(typeKind)) { return false; } - return getApiResources(ApiVersionUtil.joinApiGroupAndVersion( - HasMetadata.getGroup(type), HasMetadata.getVersion(type))).getResources() - .stream() - .anyMatch(r -> typeKind.equals(r.getKind())); + final APIResourceList apiResources = getApiResources(ApiVersionUtil.joinApiGroupAndVersion( + HasMetadata.getGroup(type), HasMetadata.getVersion(type))); + if (apiResources == null) { + return false; + } + return apiResources.getResources() + .stream() + .anyMatch(r -> typeKind.equals(r.getKind())); } @Override diff --git a/kubernetes-itests/pom.xml b/kubernetes-itests/pom.xml index 27109ea293a..e15d2101423 100644 --- a/kubernetes-itests/pom.xml +++ b/kubernetes-itests/pom.xml @@ -89,5 +89,33 @@ + + httpclient-jetty + + + io.fabric8 + openshift-client + test + + + io.fabric8 + kubernetes-httpclient-okhttp + + + + + io.fabric8 + kubernetes-httpclient-jetty + + + + + + + + + + + diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 95a8209f531..c9f1349778b 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -32,7 +32,6 @@ import io.fabric8.kubernetes.client.dsl.ExecWatch; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.readiness.Readiness; -import io.fabric8.kubernetes.client.utils.IOHelpers; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -137,31 +138,28 @@ void evict() { .withSpec(pod1.getSpec()) .build(); - client.pods().withName(pod1.getMetadata().getName()) - .waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); + client.pods().resource(pod1).waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); - client.pods().createOrReplace(pod2); - client.pods().withName(pod2.getMetadata().getName()) - .waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); + client.pods().resource(pod2).createOrReplace(); + client.pods().resource(pod2).waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); - client.policy().v1beta1().podDisruptionBudget().createOrReplace(pdb); + client.resource(pdb).createOrReplace(); // the server needs to process the pdb before the eviction can proceed, so we'll need to wait here await().atMost(5, TimeUnit.MINUTES) .until(() -> client.pods().withName(pod2.getMetadata().getName()).evict()); // cant evict because only one left - assertFalse(client.pods().withName(pod1.getMetadata().getName()).evict()); + assertFalse(client.pods().resource(pod1).evict()); // ensure it really is still up - assertTrue(Readiness.getInstance().isReady(client.pods().withName(pod1.getMetadata().getName()).fromServer().get())); + assertTrue(Readiness.getInstance().isReady(client.pods().resource(pod1).fromServer().get())); // create another pod to satisfy PDB - client.pods().createOrReplace(pod3); - client.pods().withName(pod3.getMetadata().getName()) - .waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); + client.pods().resource(pod3).createOrReplace(); + client.pods().resource(pod3).waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); // can now evict - assertTrue(client.pods().withName(pod1.getMetadata().getName()).evict()); + assertTrue(client.pods().resource(pod3).evict()); } @Test @@ -172,16 +170,17 @@ void log() { } @Test - void exec() throws InterruptedException, IOException { + void exec() throws Exception { client.pods().withName("pod-standard").waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); final CountDownLatch execLatch = new CountDownLatch(1); ByteArrayOutputStream out = new ByteArrayOutputStream(); - AtomicBoolean closed = new AtomicBoolean(); + final AtomicBoolean closed = new AtomicBoolean(); + final AtomicBoolean failed = new AtomicBoolean(); + final CompletableFuture exitStatus = new CompletableFuture<>(); int[] exitCode = new int[] { Integer.MAX_VALUE }; - ExecWatch execWatch = client.pods().withName("pod-standard") + client.pods().withName("pod-standard") .writingOutput(out) - .redirectingErrorChannel() - .withTTY().usingListener(new ExecListener() { + .usingListener(new ExecListener() { @Override public void onOpen() { logger.info("Shell was opened"); @@ -190,6 +189,7 @@ public void onOpen() { @Override public void onFailure(Throwable t, Response failureResponse) { logger.info("Shell barfed"); + failed.set(true); execLatch.countDown(); } @@ -203,13 +203,15 @@ public void onClose(int i, String s) { @Override public void onExit(int code, Status status) { exitCode[0] = code; + exitStatus.complete(status); } - }).exec("date"); - // the stream must be read or closed to receive onClose - assertEquals("{\"metadata\":{},\"status\":\"Success\"}", IOHelpers.readFully(execWatch.getErrorChannel())); - execLatch.await(5, TimeUnit.SECONDS); + }).exec("sh", "-c", "echo 'hello world!'"); + assertThat(exitStatus.get(9995, TimeUnit.SECONDS)) + .hasFieldOrPropertyWithValue("status", "Success"); + assertTrue(execLatch.await(9995, TimeUnit.SECONDS)); assertEquals(0, exitCode[0]); assertTrue(closed.get()); + assertFalse(failed.get()); assertNotNull(out.toString()); } @@ -242,7 +244,7 @@ void uploadFile() throws IOException { client.pods().withName("pod-standard").waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); // Wait for resources to get ready final Path tmpFile = Files.createTempFile("PodIT", "toBeUploaded"); - Files.write(tmpFile, Arrays.asList("I'm uploaded")); + Files.write(tmpFile, Collections.singletonList("I'm uploaded")); assertUploaded("pod-standard", tmpFile, "/tmp/toBeUploaded"); assertUploaded("pod-standard", tmpFile, "/tmp/001_special_!@#\\$^&(.mp4"); diff --git a/pom.xml b/pom.xml index 8cead696256..79ff6d4a718 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 1.15.0 1.15.0_1 2.13.3 + 11.0.9 0.2.2 3.8.5 3.6.4 @@ -94,24 +95,21 @@ 3.0.2 + 5.8.2 3.23.1 4.2.0 15.6.0 + 4.6.1 - 2.5.0_1 - 1.4.2 1.4.2_1 1.0.2 1.0.1_1 - 1.11-8 1.11-8_1 1.3.0 8.0.1 ${jackson.version} - 5.8.2 0.3.0 1.7.36 - 4.6.1 1.18.24 1.30 1.70 @@ -177,7 +175,7 @@ false 2020-11-14T12:24:00Z 2.0.1.Final - + io.fabric8.kubernetes.api.model @@ -402,7 +400,7 @@ io.fabric8 kubernetes-httpclient-okhttp - ${project.version} + ${project.version} com.squareup.okhttp3 @@ -414,6 +412,11 @@ kubernetes-httpclient-jdk ${project.version} + + io.fabric8 + kubernetes-httpclient-jetty + ${project.version} + io.fabric8 openshift-client-api @@ -754,6 +757,21 @@ ${bouncycastle.version} true + + org.eclipse.jetty + jetty-client + ${jetty.version} + + + org.eclipse.jetty.http2 + http2-http-client-transport + ${jetty.version} + + + org.eclipse.jetty.websocket + websocket-jetty-client + ${jetty.version} + org.projectlombok lombok @@ -765,6 +783,12 @@ slf4j-api ${slf4j.version} + + org.slf4j + slf4j-simple + ${slf4j.version} + provided + @@ -1410,6 +1434,7 @@ httpclient-jdk + httpclient-jetty httpclient-tests