Skip to content

Commit

Permalink
fix test case and change behavior of WatchHTTPManager
Browse files Browse the repository at this point in the history
  • Loading branch information
honnix committed Oct 7, 2019
1 parent 45f72ff commit 5c0e393
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.fabric8.kubernetes.client.dsl.internal;

import static java.net.HttpURLConnection.HTTP_GONE;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
Expand All @@ -28,22 +30,28 @@
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.net.HttpURLConnection.HTTP_GONE;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
Watch {
Expand Down Expand Up @@ -265,16 +273,14 @@ public void onMessage(String messageSource) throws IOException {
}
} else if (object instanceof Status) {
Status status = (Status) object;
// The resource version no longer exists - this has to be handled by the caller.

if (status.getCode() == HTTP_GONE) {
// exception
// shut down executor, etc.
close();
watcher.onClose(new KubernetesClientException(status));
return;
logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get());
resourceVersion.set(null);
scheduleReconnect();
} else {
logger.error("Error received: {}", status.toString());
}

logger.error("Error received: {}", status.toString());
} else {
logger.error("Unknown message received: {}", messageSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import junit.framework.AssertionFailedError;

import java.net.HttpURLConnection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import junit.framework.AssertionFailedError;
import org.junit.Rule;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -59,38 +57,51 @@ public class WatchTest {
static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build();

@Test
public void testDeletedAndOutdated() throws InterruptedException {
public void testDeletedOutdatedAndAdded() throws InterruptedException {
logger.info("testDeletedAndOutdated");
KubernetesClient client = server.getClient().inNamespace("test");

final String path = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true";
final String pathWithoutResourceVersion = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&watch=true";

// DELETED event, then history outdated
server.expect()
.withPath("/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true")
.withPath(path)
.andUpgradeToWebSocket().open().waitFor(2000).andEmit(new WatchEvent(pod1, "DELETED")).waitFor(2000)
.andEmit(outdatedEvent).done().once();

// ADDED event
server.expect()
.withPath(pathWithoutResourceVersion)
.andUpgradeToWebSocket().open().waitFor(2000).andEmit(new WatchEvent(pod1, "ADDED")).done().once();

final CountDownLatch deleteLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
final CountDownLatch addLatch = new CountDownLatch(1);
final boolean[] onCloseCalled = {false};
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
switch (action) {
case DELETED:
deleteLatch.countDown();
break;
case ADDED:
addLatch.countDown();
break;
default:
throw new AssertionFailedError();
}
}

@Override
public void onClose(KubernetesClientException cause) {
closeLatch.countDown();
onCloseCalled[0] =true;
}
})) /* autoclose */ {
assertTrue(deleteLatch.await(10, TimeUnit.SECONDS));
assertTrue(closeLatch.await(10, TimeUnit.SECONDS));
assertTrue(addLatch.await(10, TimeUnit.SECONDS));
}
assertTrue(onCloseCalled[0]);
}

@Test
Expand Down Expand Up @@ -128,25 +139,28 @@ public void testHttpErrorReconnect() throws InterruptedException {
// accept watch and disconnect
server.expect().withPath(path).andUpgradeToWebSocket().open().done().once();
// refuse reconnect attempts 6 times
server.expect().withPath(path).andReturn(503, new StatusBuilder().withCode(503).build()).times(6);
// accept next reconnect and send outdated event to stop the watch
server.expect().withPath(path).andUpgradeToWebSocket().open(outdatedEvent).done().once();
server.expect().withPath(path).andReturn(503, new StatusBuilder().withCode(503).build()).times(1);
// accept next reconnect and send ADDED event
server.expect().withPath(path)
.andUpgradeToWebSocket().open(new WatchEvent(pod1, "ADDED")).done().once();

final CountDownLatch closeLatch = new CountDownLatch(1);
final CountDownLatch addLatch = new CountDownLatch(1);
final boolean[] onCloseCalled = {false};
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
throw new AssertionFailedError();
addLatch.countDown();
}

@Override
public void onClose(KubernetesClientException cause) {
logger.debug("onClose", cause);
closeLatch.countDown();
onCloseCalled[0] =true;
}
})) /* autoclose */ {
assertTrue(closeLatch.await(3, TimeUnit.MINUTES));
assertTrue(addLatch.await(3, TimeUnit.MINUTES));
}
assertTrue(onCloseCalled[0]);
}

@Test
Expand Down

0 comments on commit 5c0e393

Please sign in to comment.