Skip to content

Commit

Permalink
Fix #2514: SharedIndexInformer watches only pods of its own namespace…
Browse files Browse the repository at this point in the history
… when run in the cluster
  • Loading branch information
rohanKanojia committed Oct 9, 2020
1 parent 579d972 commit 77f74db
Show file tree
Hide file tree
Showing 17 changed files with 495 additions and 274 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
### 4.12-SNAPSHOT

#### Bugs
* Fix #2514: SharedIndexInformer watches only pods of its own namespace when run in the cluster

#### Improvements

#### Dependency Upgrade

#### New Features

* Fix #2531: Allow setting the maximum concurrent requests via system property / environment variable

### 4.12.0 (2020-10-02)
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ private List<RollableScalableResource<ReplicaSet, DoneableReplicaSet>> doGetLog(
null, context.getApiGroupName(), context.getApiGroupVersion(), context.getCascading(), null, context.getLabels(),
context.getLabelsNot(), context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(),
context.getResourceVersion(), context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null
), podLogWaitTimeout);
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null,
context.isNamespaceFromGlobalConfig()), podLogWaitTimeout);
ReplicaSetList rcList = rsOperations.withLabels(getDeploymentSelectorLabels(deployment)).list();

for (ReplicaSet rs : rcList.getItems()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ private List<RollableScalableResource<ReplicaSet, DoneableReplicaSet>> doGetLog(
null, context.getApiGroupName(), context.getApiGroupVersion(), context.getCascading(), null, context.getLabels(),
context.getLabelsNot(), context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(),
context.getResourceVersion(), context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null
), podLogWaitTimeout);
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null,
context.isNamespaceFromGlobalConfig()), podLogWaitTimeout);
ReplicaSetList rcList = rsOperations.withLabels(getDeploymentSelectorLabels(deployment)).list();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
Expand Down Expand Up @@ -78,6 +78,8 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
* Constructs and returns a shared index informer with resync period specified. And the
* informer cache will be overwritten.
*
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
* @param resyncPeriodInMillis resync period in milliseconds
Expand All @@ -88,12 +90,15 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, Class<L> apiListTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, context.withApiGroupName(Utils.getAnnotationValue(apiTypeClass, ApiGroup.class))
.withApiGroupVersion(Utils.getAnnotationValue(apiTypeClass, ApiVersion.class))
.withPlural(getPluralFromKind(apiTypeClass.getSimpleName())), resyncPeriodInMillis);
.withPlural(getPluralFromKind(apiTypeClass.getSimpleName()))
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis);
}

/**
* Constructs and returns a shared index informer with resync period specified for custom resources.
*
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param customResourceContext basic information about the Custom Resource Definition corresponding to that custom resource
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
Expand All @@ -105,11 +110,14 @@ public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>>
public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerForCustomResource(CustomResourceDefinitionContext customResourceContext, Class<T> apiTypeClass, Class<L> apiListTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, context.withApiGroupVersion(customResourceContext.getVersion())
.withApiGroupName(customResourceContext.getGroup())
.withPlural(customResourceContext.getPlural()), resyncPeriodInMillis);
.withPlural(customResourceContext.getPlural())
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis);
}

/**
* Constructs and returns a shared index informer with resync period specified for custom resources.
* Constructs and returns a shared index informer with resync period specified for custom resources. You can use this
* method to specify namespace in {@link OperationContext} if you want to monitor for events in a dedicated namespace
* only or provide other filtering options.
*
* @param customResourceContext basic information about the Custom Resource Definition corresponding to that custom resource
* @param apiTypeClass apiType class
Expand All @@ -129,7 +137,8 @@ public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>>

/**
* Constructs and returns a shared index informer with resync period specified. And the
* informer cache will be overwritten.
* informer cache will be overwritten. You can use this method to specify namespace in {@link OperationContext}
* if you want to monitor for events in a dedicated namespace only or provide other filtering options.
*
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
Expand All @@ -153,19 +162,15 @@ private <T extends HasMetadata, L extends KubernetesResourceList<T>> ListerWatch

return new ListerWatcher<T, L>() {
@Override
public L list(ListOptions params, String namespace, OperationContext context) throws KubernetesClientException {
BaseOperation<T, L, ?, ?> listBaseOperation = baseOperation.newInstance(context.withNamespace(namespace));
listBaseOperation.setType(apiTypeClass);
listBaseOperation.setListType(apiListTypeClass);
public L list(ListOptions params, String namespace, OperationContext context) {
BaseOperation<T, L, ?, ?> listBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);

return listBaseOperation.list();
}

@Override
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> resourceWatcher) throws KubernetesClientException {
BaseOperation<T, L, ?, ?> watchBaseOperation = baseOperation.newInstance(context);
watchBaseOperation.setType(apiTypeClass);
watchBaseOperation.setListType(apiListTypeClass);
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> resourceWatcher) {
BaseOperation<T, L, ?, ?> watchBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);

// Register Custom Kind in case of CustomResource
if (context.getApiGroupName() != null && context.getApiGroupVersion() != null) {
Expand Down Expand Up @@ -235,4 +240,23 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool)
public void addSharedInformerEventListener(SharedInformerEventListener event) {
this.eventListeners.add(event);
}

private <T extends HasMetadata, L extends KubernetesResourceList<T>> BaseOperation<T, L, ?, ?> getConfiguredBaseOperation(String namespace, OperationContext context, Class<T> apiTypeClass, Class<L> apiListTypeClass) {
BaseOperation<T, L, ?, ?> baseOperationWithContext;
// Avoid adding Namespace if it's picked from Global Configuration
if (context.isNamespaceFromGlobalConfig()) {
// SharedInformer default behavior is to watch in all namespaces
// unless we specify namespace explicitly in OperationContext
baseOperationWithContext = baseOperation.newInstance(context
.withConfig(new ConfigBuilder(config)
.withNamespace(null)
.build())
.withNamespace(null));
} else {
baseOperationWithContext = baseOperation.newInstance(context.withNamespace(namespace));
}
baseOperationWithContext.setType(apiTypeClass);
baseOperationWithContext.setListType(apiListTypeClass);
return baseOperationWithContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static PodOperationsImpl getGenericPodOperations(OperationContext context
"v1", context.getCascading(), context.getItem(), context.getLabels(), context.getLabelsNot(),
context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(), context.getResourceVersion(),
context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), null, null, null, null, null,
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), context.isNamespaceFromGlobalConfig(), null, null, null, null, null,
null, null, null, null, false, false, false, null, null,
null, isPretty, null, null, null, null, null, podLogWaitTimeout));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobBuilder;
import io.fabric8.kubernetes.client.Config;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

class OperationContextTest {
private OkHttpClient okHttpClient;
private Config config;

@BeforeEach
public void setUp() {
this.okHttpClient = Mockito.mock(OkHttpClient.class, Mockito.RETURNS_DEEP_STUBS);
this.config = Mockito.mock(Config.class, Mockito.RETURNS_DEEP_STUBS);
}


@Test
void testNamespaceIsSetFromGlobalConfiguration() {
// Given
OperationContext operationContext = new OperationContext();
when(config.getNamespace()).thenReturn("namespace-from-config");

// When
operationContext = operationContext
.withConfig(config);

// Then
assertNotNull(operationContext);
assertTrue(operationContext.isNamespaceFromGlobalConfig());
assertEquals("namespace-from-config", operationContext.getNamespace());
}

@Test
void testNamespaceIsSetFromOperationContext() {
// Given
OperationContext operationContext = new OperationContext();
when(config.getNamespace()).thenReturn("namespace-from-config");

// When
operationContext = operationContext
.withNamespace("operation-namespace")
.withConfig(config);

// Then
assertNotNull(operationContext);
assertFalse(operationContext.isNamespaceFromGlobalConfig());
assertEquals("operation-namespace", operationContext.getNamespace());
}

@Test
void testCompeteOperationContext() {
// Given
OperationContext operationContext = new OperationContext();

// When
operationContext = operationContext.withNamespace("operation-namespace")
.withName("operand-name")
.withConfig(config)
.withApiGroupName("batch")
.withApiGroupVersion("v1")
.withOkhttpClient(okHttpClient)
.withPlural("jobs")
.withItem(new JobBuilder().withNewMetadata().withName("testItem").endMetadata().build())
.withCascading(false)
.withLabels(Collections.singletonMap("test", "labels"))
.withLabelsIn(Collections.singletonMap("test", new String[]{"labelsIn1", "labelsIn2"}))
.withLabelsNot(Collections.singletonMap("test", new String[]{"labelsNot"}))
.withLabelsNotIn(Collections.singletonMap("test", new String[]{"labelsNotIn"}))
.withFields(Collections.singletonMap("test", "field"))
.withFieldsNot(Collections.singletonMap("test", new String[]{"fieldsNot"}))
.withResourceVersion("234343")
.withReloadingFromServer(false)
.withGracePeriodSeconds(0)
.withPropagationPolicy(DeletionPropagation.BACKGROUND)
.withWatchRetryInitialBackoffMillis(0)
.withWatchRetryBackoffMultiplier(1.0F);

// Then
assertNotNull(operationContext);
assertEquals("operation-namespace", operationContext.getNamespace());
assertEquals("operand-name", operationContext.getName());
assertEquals("batch", operationContext.getApiGroupName());
assertEquals("v1", operationContext.getApiGroupVersion());
assertEquals("jobs", operationContext.getPlural());
assertNotNull(operationContext.getItem());
assertTrue(operationContext.getItem() instanceof Job);
assertFalse(operationContext.getCascading());
assertEquals("labels", operationContext.getLabels().get("test"));
assertArrayEquals(new String[]{"labelsIn1", "labelsIn2"}, operationContext.getLabelsIn().get("test"));
assertArrayEquals(new String[]{"labelsNot"}, operationContext.getLabelsNot().get("test"));
assertArrayEquals(new String[]{"labelsNotIn"}, operationContext.getLabelsNotIn().get("test"));
assertEquals("field", operationContext.getFields().get("test"));
assertArrayEquals(new String[]{ "fieldsNot"}, operationContext.getFieldsNot().get("test"));
assertEquals("234343", operationContext.getResourceVersion());
assertFalse(operationContext.getReloadingFromServer());
assertEquals(0, operationContext.getGracePeriodSeconds());
assertEquals(DeletionPropagation.BACKGROUND, operationContext.getPropagationPolicy());
assertEquals(0, operationContext.getWatchRetryInitialBackoffMillis());
assertEquals(1.0F, operationContext.getWatchRetryBackoffMultiplier());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.examples.crds.Dummy;
import io.fabric8.kubernetes.examples.crds.DummyList;
Expand All @@ -33,7 +32,7 @@
public class CustomResourceInformerExample {
private static final Logger logger = LoggerFactory.getLogger(CustomResourceInformerExample.class);

public static void main(String args[]) {
public static void main(String[] args) {
try (KubernetesClient client = new DefaultKubernetesClient()) {
CustomResourceDefinitionContext crdContext = new CustomResourceDefinitionContext.Builder()
.withVersion("v1")
Expand Down Expand Up @@ -65,12 +64,7 @@ public void onDelete(Dummy pod, boolean deletedFinalStateUnknown) {
}
);

sharedInformerFactory.addSharedInformerEventListener(new SharedInformerEventListener() {
@Override
public void onException(Exception exception) {
System.out.println("Exception occurred, but caught");
}
});
sharedInformerFactory.addSharedInformerEventListener(exception -> System.out.println("Exception occurred, but caught"));

logger.info("Starting all registered informers");
sharedInformerFactory.startAllRegisteredInformers();
Expand All @@ -80,16 +74,18 @@ public void onException(Exception exception) {
try {
for (;;) {
logger.info("podInformer.hasSynced() : {}", podInformer.hasSynced());
Thread.sleep(200);
Thread.sleep(10 * 1000);
}
} catch (InterruptedException inEx) {
Thread.currentThread().interrupt();
logger.info("HAS_SYNCED_THREAD INTERRUPTED!");
}
});

// Wait for some time now
TimeUnit.MINUTES.sleep(60);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
logger.info("interrupted: {}", interruptedException.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.TimeUnit;

public class InformerExample {
private static Logger logger = LoggerFactory.getLogger(InformerExample.class);
private static final Logger logger = LoggerFactory.getLogger(InformerExample.class);

public static void main(String[] args) throws InterruptedException {
try (final KubernetesClient client = new DefaultKubernetesClient()) {
Expand Down
Loading

0 comments on commit 77f74db

Please sign in to comment.