diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java index bd0730862b9..ba4c9788a3d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java @@ -15,7 +15,7 @@ */ package io.fabric8.kubernetes.client.informers; -import io.fabric8.kubernetes.client.informers.cache.Store; +import io.fabric8.kubernetes.client.informers.cache.SyncableStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +29,10 @@ public class ResyncRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class); - private Store store; + private SyncableStore store; private Supplier shouldResyncFunc; - public ResyncRunnable(Store store, Supplier shouldResyncFunc) { + public ResyncRunnable(SyncableStore store, Supplier shouldResyncFunc) { this.store = store; this.shouldResyncFunc = shouldResyncFunc; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java index 03becd2c52a..3c1657b5ead 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java @@ -51,7 +51,12 @@ public interface SharedInformer { */ void stop(); - boolean hasSynced(); + /** + * Return true if the Informer has ever synced + */ + default boolean hasSynced() { + return lastSyncResourceVersion() != null; + } /** * The resource version observed when last synced with the underlying store. @@ -71,4 +76,10 @@ public interface SharedInformer { * Return the class this informer is watching */ Class getApiTypeClass(); + + /** + * Return true if the informer is actively watching + * May return false when {@link #isRunning()} is true when watch needs to be re-established. + */ + boolean isWatching(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java index 22802694dff..9dc69f69b06 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BooleanSupplier; import java.util.function.Function; @@ -46,7 +47,7 @@ public class Cache implements Indexer { private Map>> indexers = new HashMap<>(); // items stores object instances - private Map items = new HashMap<>(); + private volatile ConcurrentHashMap items = new ConcurrentHashMap<>(); // indices stores objects' key by their indices private Map>> indices = new HashMap<>(); @@ -54,7 +55,7 @@ public class Cache implements Indexer { private BooleanSupplier isRunning = () -> false; public Cache() { - this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::deletionHandlingMetaNamespaceKeyFunc); + this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc); } public Cache(String indexName, Function> indexFunc, Function keyFunc) { @@ -67,19 +68,6 @@ public void setIsRunning(BooleanSupplier isRunning) { this.isRunning = isRunning; } - /** - * Add objects - * - * @param obj object - */ - @Override - public synchronized void add(T obj) { - String key = keyFunc.apply(obj); - T oldObj = this.items.get(key); - this.items.put(key, obj); - this.updateIndices(oldObj, obj, key); - } - /** * Returns the indexers registered with the cache. * @@ -114,43 +102,49 @@ public void addIndexers(Map>> indexersNew) { * Update the object. * * @param obj the object + * @return the old object */ - @Override - public synchronized void update(T obj) { - String key = keyFunc.apply(obj); - T oldObj = this.items.get(key); - this.items.put(key, obj); + public synchronized T put(T obj) { + if (obj == null) { + return null; + } + String key = getKey(obj); + T oldObj = this.items.put(key, obj); this.updateIndices(oldObj, obj, key); + return oldObj; } /** * Delete the object. * * @param obj object + * @return the old object */ - @Override - public synchronized void delete(T obj) { - String key = keyFunc.apply(obj); - boolean exists = this.items.containsKey(key); - if (exists) { - this.deleteFromIndices(this.items.get(key), key); - this.items.remove(key); + public synchronized T remove(T obj) { + String key = getKey(obj); + T old = this.items.remove(key); + if (old != null) { + this.deleteFromIndices(old, key); } + return old; } /** * Replace the content in the cache completely. + * + * Return a copy of the old cache contents * * @param list list of objects * @param resourceVersion resource version + * @return */ - @Override - public synchronized void replace(List list, String resourceVersion) { - Map newItems = new HashMap<>(); + public synchronized Map replace(List list) { + ConcurrentHashMap newItems = new ConcurrentHashMap<>(); for (T item : list) { - String key = keyFunc.apply(item); + String key = getKey(item); newItems.put(key, item); } + Map result = new HashMap<>(items); this.items = newItems; // rebuild any index @@ -158,14 +152,7 @@ public synchronized void replace(List list, String resourceVersion) { for (Map.Entry itemEntry : items.entrySet()) { this.updateIndices(null, itemEntry.getValue(), itemEntry.getKey()); } - } - - /** - * Resync - */ - @Override - public void resync() { - // Do nothing + return result; } /** @@ -174,12 +161,8 @@ public void resync() { * @return the list of keys */ @Override - public synchronized List listKeys() { - List keys = new ArrayList<>(this.items.size()); - for (Map.Entry entry : this.items.entrySet()) { - keys.add(entry.getKey()); - } - return keys; + public List listKeys() { + return new ArrayList<>(this.items.keySet()); } /** @@ -189,23 +172,24 @@ public synchronized List listKeys() { * @return the object */ @Override - public synchronized T get(T obj) { - String key = this.keyFunc.apply(obj); + public T get(T obj) { + String key = getKey(obj); return this.getByKey(key); } + public String getKey(T obj) { + String result = this.keyFunc.apply(obj); + return result == null ? "" : result; + } + /** * List all objects in the cache. * * @return the list */ @Override - public synchronized List list() { - List itemList = new ArrayList<>(this.items.size()); - for (Map.Entry entry : this.items.entrySet()) { - itemList.add(entry.getValue()); - } - return itemList; + public List list() { + return new ArrayList<>(this.items.values()); } /** @@ -215,7 +199,7 @@ public synchronized List list() { * @return the get by key */ @Override - public synchronized T getByKey(String key) { + public T getByKey(String key) { return this.items.get(key); } @@ -227,12 +211,12 @@ public synchronized T getByKey(String key) { * @return the list */ @Override - public synchronized List index(String indexName, Object obj) { + public synchronized List index(String indexName, T obj) { if (!this.indexers.containsKey(indexName)) { throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName)); } Function> indexFunc = this.indexers.get(indexName); - List indexKeys = indexFunc.apply((T) obj); + List indexKeys = indexFunc.apply(obj); Map> index = this.indices.get(indexName); if (index.isEmpty()) { return new ArrayList<>(); @@ -299,11 +283,6 @@ public synchronized List byIndex(String indexName, String indexKey) { return items; } - @Override - public void isPopulated(boolean isPopulated) { - // Do nothing - } - /** * UpdateIndices modifies the objects location in the managed indexes, if there is * an update, you must provide an oldObj @@ -374,18 +353,6 @@ public void addIndexFunc(String indexName, Function> indexFunc) this.indexers.put(indexName, indexFunc); } - /** - * Checks for DeletedFinalStateUnknown objects before calling metaNamespaceKeyFunc - * - * @param object the specific object - * @param object type - * @return the key - */ - public static String deletionHandlingMetaNamespaceKeyFunc(T object) { - return metaNamespaceKeyFunc(object); - } - - /** * It's is a convenient default KeyFunc which know show to make keys for API * objects which implement HasMetadata interface. The key uses the format diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java index 51aeeb00fdc..7ecb1a18094 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java @@ -16,22 +16,19 @@ package io.fabric8.kubernetes.client.informers.cache; -import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Wraps a {@link Store} to distribute events related to changes and syncs - * @param + * Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs */ -public class ProcessorStore implements Store { +public class ProcessorStore implements SyncableStore { - private Store actualStore; + private Cache cache; private SharedProcessor processor; - private volatile boolean populated; - public ProcessorStore(Store actualStore, SharedProcessor processor) { - this.actualStore = actualStore; + public ProcessorStore(Cache cache, SharedProcessor processor) { + this.cache = cache; this.processor = processor; } @@ -41,58 +38,50 @@ public void add(T obj) { } @Override - public synchronized void update(T obj) { - Object oldObj = this.actualStore.get(obj); + public void update(T obj) { + Object oldObj = this.cache.put(obj); if (oldObj != null) { - this.actualStore.update(obj); this.processor.distribute(new ProcessorListener.UpdateNotification(oldObj, obj), false); } else { - this.actualStore.add(obj); this.processor.distribute(new ProcessorListener.AddNotification(obj), false); } } @Override - public synchronized void delete(T obj) { - Object oldObj = this.actualStore.get(obj); + public void delete(T obj) { + Object oldObj = this.cache.remove(obj); if (oldObj != null) { - this.actualStore.delete(obj); this.processor.distribute(new ProcessorListener.DeleteNotification(obj, false), false); } } @Override public List list() { - return actualStore.list(); + return cache.list(); } @Override public List listKeys() { - return actualStore.listKeys(); + return cache.listKeys(); } @Override - public Object get(T object) { - return actualStore.get(object); + public T get(T object) { + return cache.get(object); } @Override public T getByKey(String key) { - return actualStore.getByKey(key); + return cache.getByKey(key); } @Override - public synchronized void replace(List list, String resourceVersion) { - // it shouldn't happen, but it's possible for metaNamespaceKeyFunc to return null, so manually collect - Map oldState = new HashMap<>(); - actualStore.list().stream().forEach(old -> oldState.put(Cache.metaNamespaceKeyFunc(old), old)); - - actualStore.replace(list, resourceVersion); - populated = true; + public void replace(List list) { + Map oldState = cache.replace(list); // now that the store is up-to-date, process the notifications for (T newValue : list) { - T old = oldState.remove(Cache.metaNamespaceKeyFunc(newValue)); + T old = oldState.remove(cache.getKey(newValue)); if (old == null) { this.processor.distribute(new ProcessorListener.AddNotification(newValue), true); } else { @@ -106,18 +95,8 @@ public synchronized void replace(List list, String resourceVersion) { @Override public void resync() { - this.actualStore.list() + this.cache.list() .forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification(i, i), true)); } - @Override - public void isPopulated(boolean isPopulated) { - this.populated = isPopulated; - } - - @Override - public boolean hasSynced() { - return populated; - } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java index 25ab1c0dca3..5e86f5f5529 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -18,7 +18,10 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptionsBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; import org.slf4j.Logger; @@ -30,26 +33,26 @@ public class Reflector lastSyncResourceVersion; + private volatile String lastSyncResourceVersion; private final Class apiTypeClass; private final ListerWatcher listerWatcher; - private final Store store; + private final SyncableStore store; private final OperationContext operationContext; - private final ReflectorWatcher watcher; - private volatile boolean running = true; + private final ReflectorWatcher watcher; + private volatile boolean running; + private volatile boolean watching; private final AtomicReference watch; - public Reflector(Class apiTypeClass, ListerWatcher listerWatcher, Store store, OperationContext operationContext) { + public Reflector(Class apiTypeClass, ListerWatcher listerWatcher, SyncableStore store, OperationContext operationContext) { this.apiTypeClass = apiTypeClass; this.listerWatcher = listerWatcher; this.store = store; this.operationContext = operationContext; - this.lastSyncResourceVersion = new AtomicReference<>(); - this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::listSyncAndWatch); + this.watcher = new ReflectorWatcher(); this.watch = new AtomicReference<>(null); } - private L getList() { + protected L getList() { return listerWatcher.list(new ListOptionsBuilder() .withWatch(Boolean.FALSE) .withResourceVersion(null) @@ -65,8 +68,9 @@ private synchronized void stopWatcher() { Watch theWatch = watch.getAndSet(null); if (theWatch != null) { String ns = operationContext.getNamespace(); - log.debug("Stopping watcher for resource {} v{} in namespace {}", apiTypeClass, lastSyncResourceVersion.get(), ns); + log.debug("Stopping watcher for resource {} v{} in namespace {}", apiTypeClass, lastSyncResourceVersion, ns); theWatch.close(); + watchStopped(); // proactively report as stopped } } @@ -75,12 +79,13 @@ private synchronized void stopWatcher() { *
Should be called only at start and when HttpGone is seen. */ public void listSyncAndWatch() { - store.isPopulated(false); + running = true; + watching = false; final L list = getList(); final String latestResourceVersion = list.getMetadata().getResourceVersion(); - lastSyncResourceVersion.set(list.getMetadata().getResourceVersion()); + lastSyncResourceVersion = latestResourceVersion; log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion); - store.replace(list.getItems(), latestResourceVersion); + store.replace(list.getItems()); startWatcher(latestResourceVersion); } @@ -94,13 +99,76 @@ private synchronized void startWatcher(final String latestResourceVersion) { listerWatcher.watch(new ListOptionsBuilder() .withWatch(Boolean.TRUE).withResourceVersion(latestResourceVersion).withTimeoutSeconds(null).build(), operationContext.getNamespace(), operationContext, watcher)); + watching = true; + } + + private synchronized void watchStopped() { + watching = false; } public String getLastSyncResourceVersion() { - return lastSyncResourceVersion.get(); + return lastSyncResourceVersion; } public boolean isRunning() { return running; } + + public boolean isWatching() { + return watching; + } + + class ReflectorWatcher implements Watcher { + + @Override + public void eventReceived(Action action, T resource) { + if (action == null) { + throw new KubernetesClientException("Unrecognized event"); + } + if (resource == null) { + throw new KubernetesClientException("Unrecognized resource"); + } + if (log.isDebugEnabled()) { + log.debug("Event received {} {}# resourceVersion {}", action.name(), resource.getKind(), resource.getMetadata().getResourceVersion()); + } + switch (action) { + case ERROR: + throw new KubernetesClientException("ERROR event"); + case ADDED: + store.add(resource); + break; + case MODIFIED: + store.update(resource); + break; + case DELETED: + store.delete(resource); + break; + } + lastSyncResourceVersion = resource.getMetadata().getResourceVersion(); + } + + @Override + public void onClose(WatcherException exception) { + watchStopped(); + // this close was triggered by an exception, + // not the user, it is expected that the watch retry will handle this + log.warn("Watch closing with exception", exception); + if (exception.isHttpGone()) { + listSyncAndWatch(); + } + } + + @Override + public void onClose() { + watchStopped(); + log.debug("Watch gracefully closed"); + } + + @Override + public boolean reconnecting() { + return true; + } + + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java deleted file mode 100644 index a1cf0f43099..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicReference; - -public class ReflectorWatcher implements Watcher { - - private static final Logger log = LoggerFactory.getLogger(ReflectorWatcher.class); - - private final Store store; - private final AtomicReference lastSyncResourceVersion; - private final Runnable onHttpGone; - - public ReflectorWatcher(Store store, AtomicReference lastSyncResourceVersion, Runnable onHttpGone) { - this.store = store; - this.lastSyncResourceVersion = lastSyncResourceVersion; - this.onHttpGone = onHttpGone; - } - - @Override - public void eventReceived(Action action, T resource) { - if (action == null) { - throw new KubernetesClientException("Unrecognized event"); - } - if (resource == null) { - throw new KubernetesClientException("Unrecognized resource"); - } - if (log.isDebugEnabled()) { - log.debug("Event received {} {}# resourceVersion {}", action.name(), resource.getKind(), resource.getMetadata().getResourceVersion()); - } - switch (action) { - case ERROR: - throw new KubernetesClientException("ERROR event"); - case ADDED: - store.add(resource); - break; - case MODIFIED: - store.update(resource); - break; - case DELETED: - store.delete(resource); - break; - } - lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion()); - } - - @Override - public void onClose(WatcherException exception) { - // this close was triggered by an exception, - // not the user, it is expected that the watch retry will handle this - log.warn("Watch closing with exception", exception); - if (exception.isHttpGone()) { - onHttpGone.run(); - } - } - - @Override - public void onClose() { - log.debug("Watch gracefully closed"); - } - - @Override - public boolean reconnecting() { - return true; - } - -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java index 741d25426be..3dd3838bfe7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java @@ -28,30 +28,12 @@ * define the contract for obtaining objects by some arbitrary key type. * * This is ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go + * + *
Refactored to only expose read methods * * @param resource */ public interface Store { - /** - * Inserts an item into the store - * - * @param obj object - */ - void add(T obj); - - /** - * Sets an item in the store to its updated state. - * - * @param obj object - */ - void update(T obj); - - /** - * Removes an item from the store - * - * @param obj object - */ - void delete(T obj); /** * Returns a list of all the items. @@ -73,7 +55,7 @@ public interface Store { * @param object object * @return requested item if exists. */ - Object get(T object); + T get(T object); /** * Returns the request item with specific key. @@ -83,33 +65,4 @@ public interface Store { */ T getByKey(String key); - /** - * Deletes the contents of the store, using instead the given list. - * Store takes ownership of the list, you should not reference it - * after calling this function - * - * @param list list of objects - * @param resourceVersion resource version - */ - void replace(List list, String resourceVersion); - - /** - * Sends a resync event for each item. - */ - void resync(); - - /** - * Updates the status of cache in case of any API error from Kubernetes server - * - * @param isPopulated boolean value indicating whether cache is populated or not - */ - void isPopulated(boolean isPopulated); - - /** - * true if synced - */ - default boolean hasSynced() { - return true; - } - } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java new file mode 100644 index 00000000000..e58a169b41f --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SyncableStore.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.cache; + +import java.util.List; + +public interface SyncableStore extends Store { + + /** + * Inserts an item into the store + * + * @param obj object + */ + void add(T obj); + + /** + * Sets an item in the store to its updated state. + * + * @param obj object + */ + void update(T obj); + + /** + * Removes an item from the store + * + * @param obj object + */ + void delete(T obj); + + /** + * Deletes the contents of the store, using instead the given list. + * Store takes ownership of the list, you should not reference it + * after calling this function + * + * @param list list of objects + */ + void replace(List list); + + /** + * Sends a resync event for each item. + */ + void resync(); + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 1e58746b48b..51fb1255c77 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -176,11 +176,6 @@ private synchronized void stopResync() { } } - @Override - public boolean hasSynced() { - return this.processorStore.hasSynced(); - } - @Override public void addIndexers(Map>> indexers) { indexer.addIndexers(indexers); @@ -206,7 +201,12 @@ private long determineResyncPeriod(long desired, long check) { public boolean isRunning() { return !stopped && started.get() && reflector.isRunning(); } - + + @Override + public boolean isWatching() { + return reflector.isWatching(); + } + synchronized void scheduleResync(Supplier resyncFunc) { // schedule the resync runnable if (resyncCheckPeriodMillis > 0) { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/CacheTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/CacheTest.java index 4e6bfe057e4..9c7df9c99b7 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/CacheTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/CacheTest.java @@ -34,8 +34,8 @@ class CacheTest { void testCacheIndex() { Pod testPodObj = new PodBuilder().withNewMetadata().withName("test-pod").endMetadata().build(); - cache.add(testPodObj); - cache.replace(Arrays.asList(testPodObj), "0"); + cache.put(testPodObj); + cache.replace(Arrays.asList(testPodObj)); String index = mockIndexFunction(testPodObj).get(0); String key = mockKeyFunction(testPodObj); @@ -56,18 +56,18 @@ void testCacheStore() { Pod testPodObj = new PodBuilder().withNewMetadata().withName("test-pod2").endMetadata().build(); String index = mockIndexFunction(testPodObj).get(0); - cache.replace(Arrays.asList(testPodObj), "0"); - cache.delete(testPodObj); + cache.replace(Arrays.asList(testPodObj)); + cache.remove(testPodObj); List indexedObjectList = cache.byIndex("mock", index); assertEquals(0, indexedObjectList.size()); - cache.add(testPodObj); + cache.put(testPodObj); // replace cached object with null value String newClusterName = "test_cluster"; testPodObj.getMetadata().setClusterName(newClusterName); - cache.update(testPodObj); + cache.put(testPodObj); assertEquals(1, cache.list().size()); assertEquals(newClusterName, testPodObj.getMetadata().getClusterName()); @@ -77,7 +77,7 @@ void testCacheStore() { void testDefaultNamespaceIndex() { Pod testPodObj = new PodBuilder().withNewMetadata().withName("test-pod3").withNamespace("default").endMetadata().build(); - cache.add(testPodObj); + cache.put(testPodObj); List indices = Cache.metaNamespaceIndexFunc(testPodObj); assertEquals(testPodObj.getMetadata().getNamespace(), indices.get(0)); } @@ -86,7 +86,7 @@ void testDefaultNamespaceIndex() { void testDefaultNamespaceKey() { Pod testPodObj = new PodBuilder().withNewMetadata().withName("test-pod4").withNamespace("default").endMetadata().build(); - cache.add(testPodObj); + cache.put(testPodObj); assertEquals("", Cache.metaNamespaceKeyFunc(null)); assertEquals("default/test-pod4", Cache.metaNamespaceKeyFunc(testPodObj)); assertEquals("default/test-pod4", Cache.namespaceKeyFunc("default", "test-pod4")); @@ -114,7 +114,7 @@ void testAddIndexers() { .withNewMetadata().withNamespace("test").withName("test-pod").withClusterName("test-cluster").endMetadata() .withNewSpec().withNodeName("test-node").endSpec() .build(); - podCache.add(testPod); + podCache.put(testPod); List namespaceIndexedPods = podCache.byIndex(Cache.NAMESPACE_INDEX, "test"); assertEquals(1, namespaceIndexedPods.size()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ListerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ListerTest.java index f5b3d43eaec..2ae9bff3d1a 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ListerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ListerTest.java @@ -38,7 +38,7 @@ void testListerBasic() { new PodBuilder().withNewMetadata().withName("foo1").withNamespace("default").endMetadata().build(), new PodBuilder().withNewMetadata().withName("foo2").withNamespace("default").endMetadata().build(), new PodBuilder().withNewMetadata().withName("foo3").withNamespace("default").endMetadata().build() - ), "0"); + )); List namespacedPodList = namespacedPodLister.list(); assertEquals(3, namespacedPodList.size()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java index 7829c8ef84e..2bb7f6b108f 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java @@ -38,11 +38,11 @@ public class ProcessorStoreTest { public void testEvents() { ArgumentCaptor> notificationCaptor = ArgumentCaptor.forClass(Notification.class); ArgumentCaptor syncCaptor = ArgumentCaptor.forClass(Boolean.class); - Store podStore = Mockito.mock(Store.class); + Cache podCache = Mockito.mock(Cache.class); SharedProcessor processor = Mockito.mock(SharedProcessor.class); - ProcessorStore processorStore = new ProcessorStore<>(podStore, processor); - Pod pod = new Pod(); + ProcessorStore processorStore = new ProcessorStore<>(podCache, processor); + Pod pod = new PodBuilder().withNewMetadata().withName("pod").endMetadata().build(); // add notification processorStore.add(pod); @@ -54,10 +54,11 @@ public void testEvents() { processorStore.delete(pod); // update notification - Mockito.when(podStore.get(pod)).thenReturn(pod); + Mockito.when(podCache.put(pod)).thenReturn(pod); processorStore.update(pod); // delete notification + Mockito.when(podCache.remove(pod)).thenReturn(pod); processorStore.delete(pod); Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture()); @@ -76,23 +77,22 @@ public void testEvents() { public void testSyncEvents() { ArgumentCaptor> notificationCaptor = ArgumentCaptor.forClass(Notification.class); ArgumentCaptor syncCaptor = ArgumentCaptor.forClass(Boolean.class); - Store podStore = Mockito.mock(Store.class); + Cache podCache = new Cache<>(); SharedProcessor processor = Mockito.mock(SharedProcessor.class); - ProcessorStore processorStore = new ProcessorStore<>(podStore, processor); + ProcessorStore processorStore = new ProcessorStore<>(podCache, processor); Pod pod = new PodBuilder().withNewMetadata().endMetadata().build(); Pod pod2 = new PodBuilder().withNewMetadata().withName("pod2").endMetadata().build(); // replace two values with an empty store - processorStore.replace(Arrays.asList(pod, pod2), null); + processorStore.replace(Arrays.asList(pod, pod2)); // resync two values - Mockito.when(podStore.list()).thenReturn(Arrays.asList(pod, pod2)); processorStore.resync(); // relist with deletes - processorStore.replace(Collections.emptyList(), null); + processorStore.replace(Collections.emptyList()); Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java new file mode 100644 index 00000000000..ae72e742cc9 --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ReflectorTest.java @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.dsl.base.OperationContext; +import io.fabric8.kubernetes.client.informers.ListerWatcher; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +class ReflectorTest { + + @Test + void testStateFlags() { + ListerWatcher mock = Mockito.mock(ListerWatcher.class); + PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); + Mockito.when(mock.list(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(list); + + Reflector reflector = + new Reflector<>(Pod.class, mock, Mockito.mock(SyncableStore.class), new OperationContext()); + + assertFalse(reflector.isWatching()); + assertFalse(reflector.isRunning()); + + // throw an exception, then watch normally + Mockito.when(mock.watch(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())) + .thenThrow(new KubernetesClientException("error")) + .thenReturn(Mockito.mock(Watch.class)); + + assertThrows(KubernetesClientException.class, () -> reflector.listSyncAndWatch()); + + // running but watch failed + assertFalse(reflector.isWatching()); + assertTrue(reflector.isRunning()); + + reflector.listSyncAndWatch(); + + assertTrue(reflector.isWatching()); + assertTrue(reflector.isRunning()); + + reflector.stop(); + + assertFalse(reflector.isWatching()); + assertFalse(reflector.isRunning()); + } + +}