diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java index 9ab8033c82a..498206487a7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java @@ -57,7 +57,7 @@ public class PortForwarderWebsocket implements PortForwarder { private static final Logger LOG = LoggerFactory.getLogger(PortForwarderWebsocket.class); - private OkHttpClient client; + private final OkHttpClient client; public PortForwarderWebsocket(OkHttpClient client) { this.client = client; @@ -193,7 +193,7 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan private int messagesRead = 0; - private ExecutorService pumperService = Executors.newSingleThreadExecutor(); + private final ExecutorService pumperService = Executors.newSingleThreadExecutor(); @Override public void onOpen(final WebSocket webSocket, Response response) { @@ -298,7 +298,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { LOG.debug("{}: onFailure", logPrefix); if (alive.get()) { serverThrowables.add(t); - LOG.error(logPrefix + ": Throwable received from websocket", t); + LOG.error("{}: Throwable received from websocket", logPrefix, t); closeForwarder(); } } @@ -321,14 +321,14 @@ private void closeForwarder() { try { in.close(); } catch (IOException e) { - LOG.error(logPrefix + ": Error while closing the client input channel", e); + LOG.error("{}: Error while closing the client input channel", logPrefix, e); } } if (out != null && out != in) { try { out.close(); } catch (IOException e) { - LOG.error(logPrefix + ": Error while closing the client output channel", e); + LOG.error("{}: Error while closing the client output channel", logPrefix, e); } } closeExecutor(pumperService); @@ -388,6 +388,7 @@ public static void closeQuietly(Closeable... cloaseables) { c.close(); } } catch (IOException ioe) { + // Ignored } } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java index c3c38b964c1..03dc739dcda 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java @@ -46,7 +46,6 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecWatch; -import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; import io.fabric8.kubernetes.client.server.mock.KubernetesServer; @@ -54,26 +53,38 @@ import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Response; import okio.ByteString; -import org.assertj.core.util.Files; import org.junit.Assert; import org.junit.Rule; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport; +import org.junit.rules.TemporaryFolder; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @EnableRuleMigrationSupport -public class PodTest { +class PodTest { + @Rule public KubernetesServer server = new KubernetesServer(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private KubernetesClient client; + + @BeforeEach + void setUp() { + client = server.getClient().inNamespace("test"); + } @Test - public void testList() { + void testList() { server.expect().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().build()).once(); server.expect().withPath("/api/v1/namespaces/ns1/pods").andReturn(200, new PodListBuilder() .addNewItem().and() @@ -85,8 +96,6 @@ public void testList() { .addNewItem() .and().build()).once(); - - KubernetesClient client = server.getClient(); PodList podList = client.pods().list(); assertNotNull(podList); assertEquals(0, podList.getItems().size()); @@ -101,7 +110,7 @@ public void testList() { } @Test - public void testListWithLables() { + void testListWithLabels() { server.expect().withPath("/api/v1/namespaces/test/pods?labelSelector=" + Utils.toUrlEncoded("key1=value1,key2=value2,key3=value3")).andReturn(200, new PodListBuilder().build()).always(); server.expect().withPath("/api/v1/namespaces/test/pods?labelSelector=" + Utils.toUrlEncoded("key1=value1,key2=value2")).andReturn(200, new PodListBuilder() .addNewItem().and() @@ -109,7 +118,6 @@ public void testListWithLables() { .addNewItem().and() .build()).once(); - KubernetesClient client = server.getClient(); PodList podList = client.pods() .withLabel("key1", "value1") .withLabel("key2","value2") @@ -130,13 +138,12 @@ public void testListWithLables() { } @Test - public void testListWithFields() { + void testListWithFields() { server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=" + Utils.toUrlEncoded("key1=value1,key2=value2,key3!=value3,key3!=value4")).andReturn(200, new PodListBuilder() .addNewItem().and() .addNewItem().and() .build()).once(); - KubernetesClient client = server.getClient(); PodList podList = client.pods() .withField("key1", "value1") .withField("key2","value2") @@ -144,14 +151,13 @@ public void testListWithFields() { .withoutField("key3", "value4") .list(); - assertNotNull(podList); assertEquals(2, podList.getItems().size()); } @Test - public void testEditMissing() { + void testEditMissing() { Assertions.assertThrows(KubernetesClientException.class, () -> { server.expect().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(404, "error message from kubernetes").always(); KubernetesClient client = server.getClient(); @@ -161,11 +167,10 @@ public void testEditMissing() { } @Test - public void testDelete() { + void testDelete() { server.expect().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, new PodBuilder().build()).once(); server.expect().withPath("/api/v1/namespaces/ns1/pods/pod2").andReturn(200, new PodBuilder().build()).once(); - KubernetesClient client = server.getClient(); Boolean deleted = client.pods().withName("pod1").delete(); assertTrue(deleted); @@ -178,7 +183,7 @@ public void testDelete() { } @Test - public void testDeleteMulti() { + void testDeleteMulti() { Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").withNamespace("ns1").and().build(); Pod pod3 = new PodBuilder().withNewMetadata().withName("pod3").withNamespace("any").and().build(); @@ -186,8 +191,6 @@ public void testDeleteMulti() { server.expect().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, pod1).once(); server.expect().withPath("/api/v1/namespaces/ns1/pods/pod2").andReturn(200, pod2).once(); - KubernetesClient client = server.getClient(); - Boolean deleted = client.pods().inAnyNamespace().delete(pod1, pod2); assertTrue(deleted); @@ -196,11 +199,10 @@ public void testDeleteMulti() { } @Test - public void testDeleteWithNamespaceMismatch() { + void testDeleteWithNamespaceMismatch() { Assertions.assertThrows(KubernetesClientException.class, () -> { Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").withNamespace("ns1").and().build(); - KubernetesClient client = server.getClient(); Boolean deleted = client.pods().inNamespace("test1").delete(pod1); assertFalse(deleted); @@ -208,12 +210,10 @@ public void testDeleteWithNamespaceMismatch() { } @Test - public void testDeleteWithPropagationPolicy() { + void testDeleteWithPropagationPolicy() { Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); server.expect().withPath("/api/v1/namespaces/test/pods/pod1").andReturn(200, pod1).once(); - KubernetesClient client = server.getClient(); - Boolean deleted = client.pods().inNamespace("test").withName("pod1").withPropagationPolicy(DeletionPropagation.FOREGROUND).delete(); assertTrue(deleted); } @@ -226,8 +226,6 @@ void testEvict() { server.expect().withPath("/api/v1/namespaces/ns1/pods/pod3/eviction").andReturn(200, new PodBuilder().build()).once(); server.expect().withPath("/api/v1/namespaces/ns1/pods/pod4/eviction").andReturn(500, new PodBuilder().build()).once(); - KubernetesClient client = server.getClient(); - Boolean deleted = client.pods().withName("pod1").evict(); assertTrue(deleted); @@ -251,18 +249,17 @@ void testEvict() { } @Test - public void testCreateWithNameMismatch() { + void testCreateWithNameMismatch() { Assertions.assertThrows(KubernetesClientException.class, () -> { Pod pod1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("test").and().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").withNamespace("ns1").and().build(); - KubernetesClient client = server.getClient(); client.pods().inNamespace("test1").withName("mypod1").create(pod1); }); } @Test - public void testGetLog() { + void testGetLog() { String pod1Log = "pod1Log"; String pod2Log = "pod2Log"; String pod3Log = "pod3Log"; @@ -275,8 +272,6 @@ public void testGetLog() { server.expect().withPath("/api/v1/namespaces/test4/pods/pod1/log?pretty=false&limitBytes=100").andReturn(200, pod1Log).once(); server.expect().withPath("/api/v1/namespaces/test5/pods/pod1/log?pretty=false&tailLines=1×tamps=true").andReturn(200, pod1Log).once(); - KubernetesClient client = server.getClient(); - String log = client.pods().withName("pod1").getLog(true); assertEquals(pod1Log, log); @@ -297,7 +292,7 @@ public void testGetLog() { } @Test - public void testExec() throws InterruptedException { + void testExec() throws InterruptedException { String expectedOutput = "file1 file2"; server.expect().withPath("/api/v1/namespaces/test/pods/pod1/exec?command=ls&stdout=true") .andUpgradeToWebSocket() @@ -305,8 +300,6 @@ public void testExec() throws InterruptedException { .done() .always(); - KubernetesClient client = server.getClient(); - final CountDownLatch execLatch = new CountDownLatch(1); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ExecWatch watch = client.pods().withName("pod1").writingOutput(baos).usingListener(new ExecListener() { @@ -332,51 +325,47 @@ public void onClose(int code, String reason) { } - @Test public void testWatch() throws InterruptedException { + // Given //We start with a list Pod pod1 = new PodBuilder() - .withNewMetadata() - .withName("pod1") - .withResourceVersion("1") - .endMetadata() - .build(); - - server.expect().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder() - .withNewMetadata() - .withResourceVersion("1") - .endMetadata() - .addToItems(pod1) - .build() + .withNewMetadata() + .withName("pod1") + .withResourceVersion("1") + .endMetadata() + .build(); + server.expect().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder() + .withNewMetadata() + .withResourceVersion("1") + .endMetadata() + .addToItems(pod1) + .build() ).once(); - - server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=true") - .andUpgradeToWebSocket() - .open() - .waitFor(15000).andEmit(new WatchEvent(pod1, "DELETED")) - .done() - .always(); - - KubernetesClient client = server.getClient(); - + server.expect().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(50).andEmit(new WatchEvent(pod1, "DELETED")) + .done() + .always(); final CountDownLatch deleteLatch = new CountDownLatch(1); - Watch watch = client.pods().withName("pod1").watch(new Watcher() { + final Watcher watcher = new Watcher() { @Override public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); + if (action != Action.DELETED) { + fail(); } + deleteLatch.countDown(); } @Override public void onClose(KubernetesClientException cause) { - } - }); - - assertTrue(deleteLatch.await(10, TimeUnit.MINUTES)); + }; + // When + final Watch watch = client.pods().withName("pod1").watch(watcher); + // Then + assertTrue(deleteLatch.await(30, TimeUnit.SECONDS)); watch.close(); } @@ -413,11 +402,9 @@ public void testWait() throws InterruptedException { .build(); - Pod ready = new PodBuilder() - .withNewMetadata() - .withName("pod1") + Pod ready = new PodBuilder(notReady) + .editMetadata() .withResourceVersion("2") - .withNamespace("test") .endMetadata() .withNewStatus() .addNewCondition() @@ -438,18 +425,16 @@ public void testWait() throws InterruptedException { server.expect().get().withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true").andUpgradeToWebSocket() .open() - .waitFor(1000).andEmit(new WatchEvent(ready, "MODIFIED")) + .waitFor(50).andEmit(new WatchEvent(ready, "MODIFIED")) .done() .always(); - KubernetesClient client = server.getClient(); Pod result = client.pods().withName("pod1").waitUntilReady(5, TimeUnit.SECONDS); Assert.assertEquals("2", result.getMetadata().getResourceVersion()); } @Test - public void testPortForward() throws IOException { - + void testPortForward() throws IOException { server.expect().withPath("/api/v1/namespaces/test/pods/pod1/portforward?ports=123") .andUpgradeToWebSocket() .open() @@ -460,11 +445,11 @@ public void testPortForward() throws IOException { .done() .once(); - KubernetesClient client = server.getClient(); - - try(LocalPortForward portForward = client.pods().withName("pod1").portForward(123)) { + try( + LocalPortForward portForward = client.pods().withName("pod1").portForward(123); + SocketChannel channel = SocketChannel.open() + ) { int localPort = portForward.getLocalPort(); - SocketChannel channel = SocketChannel.open(); assertTrue(channel.connect(new InetSocketAddress("localhost", localPort))); ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -473,13 +458,12 @@ public void testPortForward() throws IOException { read = channel.read(buffer); } while(read >= 0); buffer.flip(); - String data = ByteString.of(buffer).utf8(); - assertEquals("Hello World", data); + channel.socket().close(); + assertEquals("Hello World", ByteString.of(buffer).utf8()); assertFalse(portForward.errorOccurred()); - assertEquals(portForward.getClientThrowables().size(), 0); - assertEquals(portForward.getServerThrowables().size(), 0); + assertEquals(0, portForward.getClientThrowables().size()); + assertEquals(0, portForward.getServerThrowables().size()); } - } @Test @@ -495,8 +479,6 @@ public void testPortForwardWithChannel() throws InterruptedException, IOExceptio .done() .once(); - KubernetesClient client = server.getClient(); - ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]); ReadableByteChannel inChannel = Channels.newChannel(in); @@ -516,24 +498,21 @@ public void testPortForwardWithChannel() throws InterruptedException, IOExceptio @Test public void testOptionalUpload() { Assertions.assertThrows(KubernetesClientException.class, () -> { - KubernetesClient client = server.getClient(); - client.pods().inNamespace("ns1").withName("pod2").dir("/etc/hosts/dir").upload(Files.temporaryFolder().toPath()); + client.pods().inNamespace("ns1").withName("pod2").dir("/etc/hosts/dir").upload(temporaryFolder.newFolder().toPath()); }); } @Test public void testOptionalCopy() { Assertions.assertThrows(KubernetesClientException.class, () -> { - KubernetesClient client = server.getClient(); - client.pods().inNamespace("ns1").withName("pod2").file("/etc/hosts").copy(Files.temporaryFolder().toPath()); + client.pods().inNamespace("ns1").withName("pod2").file("/etc/hosts").copy(temporaryFolder.newFolder().toPath()); }); } @Test public void testOptionalCopyDir() { Assertions.assertThrows(KubernetesClientException.class, () -> { - KubernetesClient client = server.getClient(); - client.pods().inNamespace("ns1").withName("pod2").dir("/etc/hosts").copy(Files.temporaryFolder().toPath()); + client.pods().inNamespace("ns1").withName("pod2").dir("/etc/hosts").copy(temporaryFolder.newFolder().toPath()); }); } @Test @@ -561,8 +540,6 @@ public void testListFromServer() { .withPath("/api/v1/namespaces/test/pods/pod1") .andReturn(200, serverPod).once(); - KubernetesClient client = server.getClient(); - List resources = client.resourceList(clientPod).fromServer().get(); assertNotNull(resources); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchOverHTTP.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchOverHTTP.java deleted file mode 100644 index c84408ea1a8..00000000000 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchOverHTTP.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.fabric8.kubernetes.client.mock; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.api.model.StatusBuilder; -import io.fabric8.kubernetes.api.model.WatchEvent; -import io.fabric8.kubernetes.api.model.WatchEventBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.server.mock.KubernetesServer; -import junit.framework.AssertionFailedError; -import org.junit.Rule; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.HttpURLConnection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -@EnableRuleMigrationSupport -public class WatchOverHTTP { - static final Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1") - .withResourceVersion("1").endMetadata().build(); - static final Status outdatedStatus = new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) - .withMessage( - "401: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") - .build(); - static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build(); - final String path = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true"; - @Rule - public KubernetesServer server = new KubernetesServer(false); - Logger logger = LoggerFactory.getLogger(WatchTest.class); - - @Test - public void testDeleted() throws InterruptedException { - logger.info("testDeleted"); - KubernetesClient client = server.getClient().inNamespace("test"); - - server.expect() - .withPath(path) - .andReturn(200, "Failed WebSocket Connection").once(); - server.expect().withPath(path).andReturnChunked(200, - new WatchEvent(pod1, "DELETED"), "\n", - new WatchEvent(pod1, "ADDED"), "\n").once(); - - final CountDownLatch addLatch = new CountDownLatch(1); - final CountDownLatch deleteLatch = new CountDownLatch(1); - try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - switch (action) { - case DELETED: - deleteLatch.countDown(); - break; - case ADDED: - addLatch.countDown(); - break; - default: - throw new AssertionFailedError(); - } - } - - @Override - public void onClose(KubernetesClientException cause) {} - })) /* autoclose */ { - assertTrue(addLatch.await(10, TimeUnit.SECONDS)); - assertTrue(deleteLatch.await(10, TimeUnit.SECONDS)); - } - } - - @Test - public void testOutdated() throws InterruptedException { - logger.info("testOutdated"); - KubernetesClient client = server.getClient().inNamespace("test"); - - server.expect() - .withPath(path) - .andReturn(200, "Failed WebSocket Connection").once(); - server.expect().withPath(path).andReturnChunked(200, outdatedEvent, "\n").once(); - - final boolean[] onCloseCalled = {false}; - try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - throw new AssertionFailedError(); - } - - @Override - public void onClose(KubernetesClientException cause) { - onCloseCalled[0] = true; - } - })){}; - assertTrue(onCloseCalled[0]); - } - - @Test - public void testHttpErrorReconnect() throws InterruptedException { - logger.info("testHttpErrorReconnect"); - KubernetesClient client = server.getClient().inNamespace("test"); - - server.expect() - .withPath(path) - .andReturn(200, "Failed WebSocket Connection").once(); - server.expect().withPath(path).andReturnChunked(503, new StatusBuilder().withCode(503).build()).times(6); - server.expect().withPath(path).andReturnChunked(200, outdatedEvent, "\n").once(); - - final CountDownLatch closeLatch = new CountDownLatch(1); - try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod resource) { - throw new AssertionFailedError(); - } - - @Override - public void onClose(KubernetesClientException cause) { - logger.debug("onClose", cause); - closeLatch.countDown(); - } - })) /* autoclose */ { - assertTrue(closeLatch.await(3, TimeUnit.MINUTES)); - } - } -}