Skip to content

Commit

Permalink
Fix fabric8io#1961: Two SharedInformer issues related to kube-apiserv…
Browse files Browse the repository at this point in the history
…er unavailable and relisting

+ relist when 410 is received
+ set HasSynced() to false when Reflector faces error
  • Loading branch information
rohanKanojia committed Feb 27, 2020
1 parent 1571080 commit cc3c0d7
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### 4.8-SNAPSHOT
#### Bugs
* Fix #1961: Two SharedInformer issues related to kube-apiserver unavailable and relisting

#### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ public synchronized List<T> byIndex(String indexName, String indexKey) {
return items;
}

@Override
public void isPopulated(boolean isPopulated) {
// Do nothing
}

/**
* UpdateIndices modifies the objects location in the managed indexes, if there is
* an update, you must provide an oldObj
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ public boolean hasSynced() {
}
}

@Override
public void isPopulated(boolean isPopulated) {
lock.writeLock().lock();
try {
this.populated = isPopulated;
} finally {
lock.writeLock().unlock();
}
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.api.model.ListMeta;
import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
Expand All @@ -28,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -41,6 +43,9 @@ public class ReflectorRunnable<T extends HasMetadata, TList extends KubernetesRe
private Class<T> apiTypeClass;
private AtomicBoolean isActive = new AtomicBoolean(true);
private OperationContext operationContext;
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
private boolean isLastSyncResourceVersionGone = false;

public ReflectorRunnable(Class<T> apiTypeClass, ListerWatcher listerWatcher, Store store, OperationContext operationContext) {
this.listerWatcher = listerWatcher;
Expand All @@ -55,6 +60,7 @@ public ReflectorRunnable(Class<T> apiTypeClass, ListerWatcher listerWatcher, Sto
*/
public void run() {
try {
Thread.currentThread().setName("Reflector-" + apiTypeClass.getSimpleName());
log.info("{}#Start listing and watching...", apiTypeClass);

TList list = listerWatcher.list(new ListOptionsBuilder().withWatch(Boolean.FALSE).withResourceVersion(null).withTimeoutSeconds(null).build(), null, operationContext);
Expand All @@ -68,6 +74,7 @@ public void run() {
}
this.syncWith(items, resourceVersion);
this.lastSyncResourceVersion = resourceVersion;
this.isLastSyncResourceVersionGone = false;

if (log.isDebugEnabled()) {
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
Expand Down Expand Up @@ -123,17 +130,27 @@ public void eventReceived(Action action, T resource) {
public void onClose(KubernetesClientException exception) {
log.error("Watch closing.");
if (exception != null) {
exception.printStackTrace();
log.error("watch closed due to " + exception.getMessage());
// Relist when HTTP_GONE is received
Status returnStatus = exception.getStatus();
if (returnStatus.getCode().equals(HttpURLConnection.HTTP_GONE)) {
isLastSyncResourceVersionGone = true;
log.info("410(HTTP_GONE) recieved, initiating re-list and re-watch");
run();
} else {
log.debug("exception received during watch", exception);
}
}
}
});
} catch (Throwable t) {
log.info("{}#Watch connection got exception {}", apiTypeClass, t.getMessage());
}
} catch (Exception exception) {
log.error("Failure in list-watch: {}", exception.getMessage());
exception.printStackTrace();
log.error("Failure in list-watch: {}, cause: {}", exception.getMessage(), exception.getCause());
log.debug("exception in listing and watching", exception);
// Update store sync status to false
store.isPopulated(false);
}
}

Expand All @@ -149,4 +166,8 @@ public String getLastSyncResourceVersion() {
return lastSyncResourceVersion;
}

public boolean isLastSyncResourceVersionGone() {
return isLastSyncResourceVersionGone;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ public interface Store<T> {
*/
void resync();

/**
* Updates the status of cache in case of any API error from Kubernetes server
*
* @param isPopulated boolean value indicating whether cache is populated or not
*/
void isPopulated(boolean isPopulated);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CustomResourceInformerExample {
Expand All @@ -41,7 +42,7 @@ public static void main(String args[]) {
.build();

SharedInformerFactory sharedInformerFactory = client.informers();
SharedIndexInformer<Dummy> podInformer = sharedInformerFactory.sharedIndexInformerForCustomResource(crdContext, Dummy.class, DummyList.class, 15 * 60 * 1000);
SharedIndexInformer<Dummy> podInformer = sharedInformerFactory.sharedIndexInformerForCustomResource(crdContext, Dummy.class, DummyList.class, 1 * 60 * 1000);
logger.info("Informer factory initialized.");

podInformer.addEventHandler(
Expand All @@ -65,8 +66,21 @@ public void onDelete(Dummy pod, boolean deletedFinalStateUnknown) {

logger.info("Starting all registered informers");
sharedInformerFactory.startAllRegisteredInformers();

Executors.newSingleThreadExecutor().submit(() -> {
Thread.currentThread().setName("HAS_SYNCED_THREAD");
try {
for (;;) {
logger.info("podInformer.hasSynced() : {}", podInformer.hasSynced());
Thread.sleep(200);
}
} catch (InterruptedException inEx) {
logger.info("HAS_SYNCED_THREAD INTERRUPTED!");
}
});

// Wait for some time now
TimeUnit.MINUTES.sleep(15);
TimeUnit.MINUTES.sleep(60);
} catch (InterruptedException interruptedException) {
logger.info("interrupted: {}", interruptedException.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;

import java.net.HttpURLConnection;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertEquals;

@EnableRuleMigrationSupport
public class DefaultSharedIndexInformerTest {
@Rule
public KubernetesServer server = new KubernetesServer(false);
Expand All @@ -51,9 +53,9 @@ public class DefaultSharedIndexInformerTest {
public void testNamespacedPodInformer() throws InterruptedException {
String startResourceVersion = "1000", endResourceVersion = "1001";

server.expect().withPath("/api/v1/pods")
server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Arrays.asList()).build()).once();
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
Expand All @@ -63,7 +65,7 @@ public void testNamespacedPodInformer() throws InterruptedException {

KubernetesClient client = server.getClient();
SharedInformerFactory factory = client.informers();
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, PodList.class, 4000L);
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, PodList.class,4000L);

AtomicBoolean foundExistingPod = new AtomicBoolean(false);
podInformer.addEventHandler(
Expand All @@ -81,10 +83,10 @@ public void onUpdate(Pod oldObj, Pod newObj) { }
@Override
public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { }
});
Thread.sleep(1000);
Thread.sleep(1000L);
factory.startAllRegisteredInformers();

Thread.sleep(3000L);
Thread.sleep(5000L);
assertEquals(true, foundExistingPod.get());
assertEquals(endResourceVersion, podInformer.lastSyncResourceVersion());

Expand All @@ -95,13 +97,13 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { }
public void testAllNamespacedInformer() throws InterruptedException {
String startResourceVersion = "1000", endResourceVersion = "1001";

server.expect().withPath("/api/v1/pods")
server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Arrays.asList()).build()).once();
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&watch=true")
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata().withNamespace("default").withName("pod1").withResourceVersion(endResourceVersion).endMetadata().build(), "ADDED"))
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1").withResourceVersion(endResourceVersion).endMetadata().build(), "ADDED"))
.waitFor(2000)
.andEmit(outdatedEvent).done().always();

Expand All @@ -114,7 +116,7 @@ public void testAllNamespacedInformer() throws InterruptedException {
new ResourceEventHandler<Pod>() {
@Override
public void onAdd(Pod obj) {
if (obj.getMetadata().getName().equalsIgnoreCase("pod1") && obj.getMetadata().getNamespace().equalsIgnoreCase("default")) {
if (obj.getMetadata().getName().equalsIgnoreCase("pod1") && obj.getMetadata().getNamespace().equalsIgnoreCase("test")) {
foundExistingPod.set(true);
}
}
Expand All @@ -125,13 +127,106 @@ public void onUpdate(Pod oldObj, Pod newObj) { }
@Override
public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { }
});
Thread.sleep(1000);
Thread.sleep(1000L);
factory.startAllRegisteredInformers();

Thread.sleep(3000L);
Thread.sleep(5000L);
assertEquals(true, foundExistingPod.get());
assertEquals(endResourceVersion, podInformer.lastSyncResourceVersion());

factory.stopAllRegisteredInformers();
}

@Test
public void shouldReconnectInCaseOf410() throws InterruptedException {
String startResourceVersion = "1000", midResourceVersion = "1001", mid2ResourceVersion = "1002", endResourceVersion = "1003";

server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Arrays.asList()).build()).once();
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1").withResourceVersion(midResourceVersion).endMetadata().build(), "ADDED"))
.waitFor(2000)
.andEmit(outdatedEvent)
.done().always();
server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(mid2ResourceVersion).endMetadata().withItems(Arrays.asList()).build()).times(2);
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + mid2ResourceVersion + "&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1").withResourceVersion(endResourceVersion).endMetadata().build(), "MODIFIED"))
.done()
.always();

KubernetesClient client = server.getClient();
SharedInformerFactory factory = client.informers();
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, PodList.class, 10000L);

AtomicBoolean relistSuccessful = new AtomicBoolean(false);
podInformer.addEventHandler(
new ResourceEventHandler<Pod>() {
@Override
public void onAdd(Pod obj) { }

@Override
public void onUpdate(Pod oldObj, Pod newObj) {
if (newObj.getMetadata().getName().equalsIgnoreCase("pod1") &&
newObj.getMetadata().getResourceVersion().equalsIgnoreCase(endResourceVersion)) {
relistSuccessful.set(true);
}
}

@Override
public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { }
});
Thread.sleep(1000L);
factory.startAllRegisteredInformers();

Thread.sleep(10000L);
assertEquals(true, relistSuccessful.get());
assertEquals(endResourceVersion, podInformer.lastSyncResourceVersion());

factory.stopAllRegisteredInformers();
}

@Test
public void testHasSynced() throws InterruptedException {
String startResourceVersion = "1000", endResourceVersion = "1001";
server.expect().withPath("/api/v1/namespaces/test/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata().withItems(Arrays.asList()).build()).once();
server.expect().withPath("/api/v1/namespaces/test/pods?resourceVersion=" + startResourceVersion + "&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new WatchEvent(new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1").withResourceVersion(endResourceVersion).endMetadata().build(), "ADDED"))
.waitFor(2000)
.andEmit(outdatedEvent).done().once();

KubernetesClient client = server.getClient();
SharedInformerFactory factory = client.informers();
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, PodList.class, 4000L);

podInformer.addEventHandler(
new ResourceEventHandler<Pod>() {
@Override
public void onAdd(Pod obj) { }

@Override
public void onUpdate(Pod oldObj, Pod newObj) { }

@Override
public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { }
});
Thread.sleep(1000L);
factory.startAllRegisteredInformers();

// Wait for resync period of pass
Thread.sleep(8000L);
assertEquals(false, podInformer.hasSynced());

factory.stopAllRegisteredInformers();
}
}

0 comments on commit cc3c0d7

Please sign in to comment.