Skip to content

Commit

Permalink
Fix fabric8io#2514: SharedIndexInformer watches only pods of its own …
Browse files Browse the repository at this point in the history
…namespace when run in the cluster
  • Loading branch information
rohanKanojia committed Nov 9, 2020
1 parent 209507b commit c21b135
Show file tree
Hide file tree
Showing 17 changed files with 487 additions and 271 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fix #2517: Replace does not work in CRUD mockwebserver
* Fix #2537: Checking for Readiness of DeploymentConfig
* Fix #2300: Remove job extensions/v1beta1 from backward compatibiliy interceptor
* Fix #2514: SharedIndexInformer watches only pods of its own namespace when run in the cluster

#### Improvements
* Fix #2507: Add a test for creating a Job with generateName
Expand All @@ -16,7 +17,6 @@
* Fix #2513: Update Kubernetes Model to v1.19.1

#### New Features

* Fix #2531: Allow setting the maximum concurrent requests via system property / environment variable
* Fix #2534: Tekton model based on Tekton Pipeline 0.17.0
* Fix #2574: Add support for Condition type
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ private List<RollableScalableResource<ReplicaSet, DoneableReplicaSet>> doGetLog(
null, context.getApiGroupName(), context.getApiGroupVersion(), context.getCascading(), null, context.getLabels(),
context.getLabelsNot(), context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(),
context.getResourceVersion(), context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null
), podLogWaitTimeout);
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null,
context.isNamespaceFromGlobalConfig()), podLogWaitTimeout);
ReplicaSetList rcList = rsOperations.withLabels(getDeploymentSelectorLabels(deployment)).list();

for (ReplicaSet rs : rcList.getItems()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ private List<RollableScalableResource<ReplicaSet, DoneableReplicaSet>> doGetLog(
null, context.getApiGroupName(), context.getApiGroupVersion(), context.getCascading(), null, context.getLabels(),
context.getLabelsNot(), context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(),
context.getResourceVersion(), context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null
), podLogWaitTimeout);
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), false, 0, null,
context.isNamespaceFromGlobalConfig()), podLogWaitTimeout);
ReplicaSetList rcList = rsOperations.withLabels(getDeploymentSelectorLabels(deployment)).list();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class SharedInformerFactory extends BaseOperation {
*/
public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClient, Config configuration) {
super(new OperationContext().withOkhttpClient(okHttpClient).withConfig(configuration));
initOperationContext(configuration);
this.informerExecutor = threadPool;
this.informers = new HashMap<>();
this.startedInformers = new HashMap<>();
Expand All @@ -77,6 +79,8 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
* Constructs and returns a shared index informer with resync period specified. And the
* informer cache will be overwritten.
*
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
* @param resyncPeriodInMillis resync period in milliseconds
Expand All @@ -87,12 +91,15 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, Class<L> apiListTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, context.withApiGroupName(Utils.getAnnotationValue(apiTypeClass, ApiGroup.class))
.withApiGroupVersion(Utils.getAnnotationValue(apiTypeClass, ApiVersion.class))
.withPlural(getPluralFromKind(apiTypeClass.getSimpleName())), resyncPeriodInMillis);
.withPlural(getPluralFromKind(apiTypeClass.getSimpleName()))
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis);
}

/**
* Constructs and returns a shared index informer with resync period specified for custom resources.
*
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param customResourceContext basic information about the Custom Resource Definition corresponding to that custom resource
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
Expand All @@ -104,11 +111,14 @@ public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>>
public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerForCustomResource(CustomResourceDefinitionContext customResourceContext, Class<T> apiTypeClass, Class<L> apiListTypeClass, long resyncPeriodInMillis) {
return sharedIndexInformerFor(apiTypeClass, apiListTypeClass, context.withApiGroupVersion(customResourceContext.getVersion())
.withApiGroupName(customResourceContext.getGroup())
.withPlural(customResourceContext.getPlural()), resyncPeriodInMillis);
.withPlural(customResourceContext.getPlural())
.withIsNamespaceConfiguredFromGlobalConfig(context.isNamespaceFromGlobalConfig()), resyncPeriodInMillis);
}

/**
* Constructs and returns a shared index informer with resync period specified for custom resources.
* Constructs and returns a shared index informer with resync period specified for custom resources. You can use this
* method to specify namespace in {@link OperationContext} if you want to monitor for events in a dedicated namespace
* only or provide other filtering options.
*
* @param customResourceContext basic information about the Custom Resource Definition corresponding to that custom resource
* @param apiTypeClass apiType class
Expand All @@ -128,7 +138,8 @@ public synchronized <T extends HasMetadata, L extends KubernetesResourceList<T>>

/**
* Constructs and returns a shared index informer with resync period specified. And the
* informer cache will be overwritten.
* informer cache will be overwritten. You can use this method to specify namespace in {@link OperationContext}
* if you want to monitor for events in a dedicated namespace only or provide other filtering options.
*
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
Expand All @@ -153,18 +164,14 @@ private <T extends HasMetadata, L extends KubernetesResourceList<T>> ListerWatch
return new ListerWatcher<T, L>() {
@Override
public L list(ListOptions params, String namespace, OperationContext context) {
BaseOperation<T, L, ?, ?> listBaseOperation = baseOperation.newInstance(context.withNamespace(namespace));
listBaseOperation.setType(apiTypeClass);
listBaseOperation.setListType(apiListTypeClass);
BaseOperation<T, L, ?, ?> listBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);

return listBaseOperation.list();
}

@Override
public Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> resourceWatcher) {
BaseOperation<T, L, ?, ?> watchBaseOperation = baseOperation.newInstance(context);
watchBaseOperation.setType(apiTypeClass);
watchBaseOperation.setListType(apiListTypeClass);
BaseOperation<T, L, ?, ?> watchBaseOperation = getConfiguredBaseOperation(namespace, context, apiTypeClass, apiListTypeClass);

// Register Custom Kind in case of CustomResource
if (context.getApiGroupName() != null && context.getApiGroupVersion() != null) {
Expand Down Expand Up @@ -234,4 +241,29 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool)
public void addSharedInformerEventListener(SharedInformerEventListener event) {
this.eventListeners.add(event);
}

private <T extends HasMetadata, L extends KubernetesResourceList<T>> BaseOperation<T, L, ?, ?> getConfiguredBaseOperation(String namespace, OperationContext context, Class<T> apiTypeClass, Class<L> apiListTypeClass) {
BaseOperation<T, L, ?, ?> baseOperationWithContext;
// Avoid adding Namespace if it's picked from Global Configuration
if (context.isNamespaceFromGlobalConfig()) {
// SharedInformer default behavior is to watch in all namespaces
// unless we specify namespace explicitly in OperationContext
baseOperationWithContext = baseOperation.newInstance(context
.withConfig(new ConfigBuilder(config)
.withNamespace(null)
.build())
.withNamespace(null));
} else {
baseOperationWithContext = baseOperation.newInstance(context.withNamespace(namespace));
}
baseOperationWithContext.setType(apiTypeClass);
baseOperationWithContext.setListType(apiListTypeClass);
return baseOperationWithContext;
}

private void initOperationContext(Config configuration) {
if (configuration.getNamespace() != null) {
context = context.withIsNamespaceConfiguredFromGlobalConfig(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static PodOperationsImpl getGenericPodOperations(OperationContext context
"v1", context.getCascading(), context.getItem(), context.getLabels(), context.getLabelsNot(),
context.getLabelsIn(), context.getLabelsNotIn(), context.getFields(), context.getFieldsNot(), context.getResourceVersion(),
context.getReloadingFromServer(), context.getGracePeriodSeconds(), context.getPropagationPolicy(),
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), null, null, null, null, null,
context.getWatchRetryInitialBackoffMillis(), context.getWatchRetryBackoffMultiplier(), context.isNamespaceFromGlobalConfig(), null, null, null, null, null,
null, null, null, null, false, false, false, null, null,
null, isPretty, null, null, null, null, null, podLogWaitTimeout));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobBuilder;
import io.fabric8.kubernetes.client.Config;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

class OperationContextTest {
private OkHttpClient okHttpClient;
private Config config;

@BeforeEach
public void setUp() {
this.okHttpClient = Mockito.mock(OkHttpClient.class, Mockito.RETURNS_DEEP_STUBS);
this.config = Mockito.mock(Config.class, Mockito.RETURNS_DEEP_STUBS);
}

@Test
void testNamespaceIsSetFromOperationContext() {
// Given
OperationContext operationContext = new OperationContext();
when(config.getNamespace()).thenReturn("namespace-from-config");

// When
operationContext = operationContext
.withNamespace("operation-namespace")
.withConfig(config);

// Then
assertNotNull(operationContext);
assertFalse(operationContext.isNamespaceFromGlobalConfig());
assertEquals("operation-namespace", operationContext.getNamespace());
}

@Test
void testCompeteOperationContext() {
// Given
OperationContext operationContext = new OperationContext();

// When
operationContext = operationContext.withNamespace("operation-namespace")
.withName("operand-name")
.withConfig(config)
.withApiGroupName("batch")
.withApiGroupVersion("v1")
.withOkhttpClient(okHttpClient)
.withPlural("jobs")
.withItem(new JobBuilder().withNewMetadata().withName("testItem").endMetadata().build())
.withCascading(false)
.withLabels(Collections.singletonMap("test", "labels"))
.withLabelsIn(Collections.singletonMap("test", new String[]{"labelsIn1", "labelsIn2"}))
.withLabelsNot(Collections.singletonMap("test", new String[]{"labelsNot"}))
.withLabelsNotIn(Collections.singletonMap("test", new String[]{"labelsNotIn"}))
.withFields(Collections.singletonMap("test", "field"))
.withFieldsNot(Collections.singletonMap("test", new String[]{"fieldsNot"}))
.withResourceVersion("234343")
.withReloadingFromServer(false)
.withGracePeriodSeconds(0)
.withPropagationPolicy(DeletionPropagation.BACKGROUND)
.withWatchRetryInitialBackoffMillis(0)
.withWatchRetryBackoffMultiplier(1.0F);

// Then
assertNotNull(operationContext);
assertEquals("operation-namespace", operationContext.getNamespace());
assertEquals("operand-name", operationContext.getName());
assertEquals("batch", operationContext.getApiGroupName());
assertEquals("v1", operationContext.getApiGroupVersion());
assertEquals("jobs", operationContext.getPlural());
assertNotNull(operationContext.getItem());
assertTrue(operationContext.getItem() instanceof Job);
assertFalse(operationContext.getCascading());
assertEquals("labels", operationContext.getLabels().get("test"));
assertArrayEquals(new String[]{"labelsIn1", "labelsIn2"}, operationContext.getLabelsIn().get("test"));
assertArrayEquals(new String[]{"labelsNot"}, operationContext.getLabelsNot().get("test"));
assertArrayEquals(new String[]{"labelsNotIn"}, operationContext.getLabelsNotIn().get("test"));
assertEquals("field", operationContext.getFields().get("test"));
assertArrayEquals(new String[]{ "fieldsNot"}, operationContext.getFieldsNot().get("test"));
assertEquals("234343", operationContext.getResourceVersion());
assertFalse(operationContext.getReloadingFromServer());
assertEquals(0, operationContext.getGracePeriodSeconds());
assertEquals(DeletionPropagation.BACKGROUND, operationContext.getPropagationPolicy());
assertEquals(0, operationContext.getWatchRetryInitialBackoffMillis());
assertEquals(1.0F, operationContext.getWatchRetryBackoffMultiplier());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.examples.crds.Dummy;
import io.fabric8.kubernetes.examples.crds.DummyList;
Expand All @@ -33,7 +32,7 @@
public class CustomResourceInformerExample {
private static final Logger logger = LoggerFactory.getLogger(CustomResourceInformerExample.class);

public static void main(String args[]) {
public static void main(String[] args) {
try (KubernetesClient client = new DefaultKubernetesClient()) {
CustomResourceDefinitionContext crdContext = new CustomResourceDefinitionContext.Builder()
.withVersion("v1")
Expand Down Expand Up @@ -65,12 +64,7 @@ public void onDelete(Dummy pod, boolean deletedFinalStateUnknown) {
}
);

sharedInformerFactory.addSharedInformerEventListener(new SharedInformerEventListener() {
@Override
public void onException(Exception exception) {
System.out.println("Exception occurred, but caught");
}
});
sharedInformerFactory.addSharedInformerEventListener(exception -> logger.error("Exception occurred, but caught"));

logger.info("Starting all registered informers");
sharedInformerFactory.startAllRegisteredInformers();
Expand All @@ -80,16 +74,18 @@ public void onException(Exception exception) {
try {
for (;;) {
logger.info("podInformer.hasSynced() : {}", podInformer.hasSynced());
Thread.sleep(200);
Thread.sleep(10 * 1000L);
}
} catch (InterruptedException inEx) {
Thread.currentThread().interrupt();
logger.info("HAS_SYNCED_THREAD INTERRUPTED!");
}
});

// Wait for some time now
TimeUnit.MINUTES.sleep(60);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
logger.info("interrupted: {}", interruptedException.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.concurrent.TimeUnit;

public class InformerExample {
private static Logger logger = LoggerFactory.getLogger(InformerExample.class);
private static final Logger logger = LoggerFactory.getLogger(InformerExample.class);

public static void main(String[] args) throws InterruptedException {
try (final KubernetesClient client = new DefaultKubernetesClient()) {
Expand Down
Loading

0 comments on commit c21b135

Please sign in to comment.