Skip to content

Commit

Permalink
fix #3090 #3091 refining the store interface and hasSynced
Browse files Browse the repository at this point in the history
splitting internal and mutative methods to SyncableStore

removing isPopulated and directly wiring the cache to be used
this adds an isRunning method to the SharedInformer.  this clarifies the
cache methods to be more like a map and the underlying map is now
concurrent to remove read locks
  • Loading branch information
shawkins authored and manusa committed Jun 9, 2021
1 parent f050493 commit 47310d7
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.informers.cache.Store;
import io.fabric8.kubernetes.client.informers.cache.SyncableStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,10 +29,10 @@ public class ResyncRunnable<T> implements Runnable {

private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);

private Store<T> store;
private SyncableStore<T> store;
private Supplier<Boolean> shouldResyncFunc;

public ResyncRunnable(Store<T> store, Supplier<Boolean> shouldResyncFunc) {
public ResyncRunnable(SyncableStore<T> store, Supplier<Boolean> shouldResyncFunc) {
this.store = store;
this.shouldResyncFunc = shouldResyncFunc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public interface SharedInformer<T> {
*/
void stop();

boolean hasSynced();
/**
* Return true if the Informer has ever synced
*/
default boolean hasSynced() {
return lastSyncResourceVersion() != null;
}

/**
* The resource version observed when last synced with the underlying store.
Expand All @@ -71,4 +76,10 @@ public interface SharedInformer<T> {
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();

/**
* Return true if the informer is actively watching
* May return false when {@link #isRunning()} is true when watch needs to be re-established.
*/
boolean isWatching();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import java.util.function.Function;

Expand All @@ -46,15 +47,15 @@ public class Cache<T> implements Indexer<T> {
private Map<String, Function<T, List<String>>> indexers = new HashMap<>();

// items stores object instances
private Map<String, T> items = new HashMap<>();
private volatile ConcurrentHashMap<String, T> items = new ConcurrentHashMap<>();

// indices stores objects' key by their indices
private Map<String, Map<String, Set<String>>> indices = new HashMap<>();

private BooleanSupplier isRunning = () -> false;

public Cache() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::deletionHandlingMetaNamespaceKeyFunc);
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
}

public Cache(String indexName, Function<T, List<String>> indexFunc, Function<T, String> keyFunc) {
Expand All @@ -67,19 +68,6 @@ public void setIsRunning(BooleanSupplier isRunning) {
this.isRunning = isRunning;
}

/**
* Add objects
*
* @param obj object
*/
@Override
public synchronized void add(T obj) {
String key = keyFunc.apply(obj);
T oldObj = this.items.get(key);
this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
}

/**
* Returns the indexers registered with the cache.
*
Expand Down Expand Up @@ -114,58 +102,57 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
* Update the object.
*
* @param obj the object
* @return the old object
*/
@Override
public synchronized void update(T obj) {
String key = keyFunc.apply(obj);
T oldObj = this.items.get(key);
this.items.put(key, obj);
public synchronized T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
}

/**
* Delete the object.
*
* @param obj object
* @return the old object
*/
@Override
public synchronized void delete(T obj) {
String key = keyFunc.apply(obj);
boolean exists = this.items.containsKey(key);
if (exists) {
this.deleteFromIndices(this.items.get(key), key);
this.items.remove(key);
public synchronized T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.deleteFromIndices(old, key);
}
return old;
}

/**
* Replace the content in the cache completely.
*
* Return a copy of the old cache contents
*
* @param list list of objects
* @param resourceVersion resource version
* @return
*/
@Override
public synchronized void replace(List<T> list, String resourceVersion) {
Map<String, T> newItems = new HashMap<>();
public synchronized Map<String, T> replace(List<T> list) {
ConcurrentHashMap<String, T> newItems = new ConcurrentHashMap<>();
for (T item : list) {
String key = keyFunc.apply(item);
String key = getKey(item);
newItems.put(key, item);
}
Map<String, T> result = new HashMap<>(items);
this.items = newItems;

// rebuild any index
this.indices = new HashMap<>();
for (Map.Entry<String, T> itemEntry : items.entrySet()) {
this.updateIndices(null, itemEntry.getValue(), itemEntry.getKey());
}
}

/**
* Resync
*/
@Override
public void resync() {
// Do nothing
return result;
}

/**
Expand All @@ -174,12 +161,8 @@ public void resync() {
* @return the list of keys
*/
@Override
public synchronized List<String> listKeys() {
List<String> keys = new ArrayList<>(this.items.size());
for (Map.Entry<String, T> entry : this.items.entrySet()) {
keys.add(entry.getKey());
}
return keys;
public List<String> listKeys() {
return new ArrayList<>(this.items.keySet());
}

/**
Expand All @@ -189,23 +172,24 @@ public synchronized List<String> listKeys() {
* @return the object
*/
@Override
public synchronized T get(T obj) {
String key = this.keyFunc.apply(obj);
public T get(T obj) {
String key = getKey(obj);
return this.getByKey(key);
}

public String getKey(T obj) {
String result = this.keyFunc.apply(obj);
return result == null ? "" : result;
}

/**
* List all objects in the cache.
*
* @return the list
*/
@Override
public synchronized List<T> list() {
List<T> itemList = new ArrayList<>(this.items.size());
for (Map.Entry<String, T> entry : this.items.entrySet()) {
itemList.add(entry.getValue());
}
return itemList;
public List<T> list() {
return new ArrayList<>(this.items.values());
}

/**
Expand All @@ -215,7 +199,7 @@ public synchronized List<T> list() {
* @return the get by key
*/
@Override
public synchronized T getByKey(String key) {
public T getByKey(String key) {
return this.items.get(key);
}

Expand All @@ -227,12 +211,12 @@ public synchronized T getByKey(String key) {
* @return the list
*/
@Override
public synchronized List<T> index(String indexName, Object obj) {
public synchronized List<T> index(String indexName, T obj) {
if (!this.indexers.containsKey(indexName)) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
List<String> indexKeys = indexFunc.apply((T) obj);
List<String> indexKeys = indexFunc.apply(obj);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
Expand Down Expand Up @@ -299,11 +283,6 @@ public synchronized List<T> byIndex(String indexName, String indexKey) {
return items;
}

@Override
public void isPopulated(boolean isPopulated) {
// Do nothing
}

/**
* UpdateIndices modifies the objects location in the managed indexes, if there is
* an update, you must provide an oldObj
Expand Down Expand Up @@ -374,18 +353,6 @@ public void addIndexFunc(String indexName, Function<T, List<String>> indexFunc)
this.indexers.put(indexName, indexFunc);
}

/**
* Checks for DeletedFinalStateUnknown objects before calling metaNamespaceKeyFunc
*
* @param object the specific object
* @param <T> object type
* @return the key
*/
public static <T> String deletionHandlingMetaNamespaceKeyFunc(T object) {
return metaNamespaceKeyFunc(object);
}


/**
* It's is a convenient default KeyFunc which know show to make keys for API
* objects which implement HasMetadata interface. The key uses the format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@

package io.fabric8.kubernetes.client.informers.cache;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Wraps a {@link Store} to distribute events related to changes and syncs
* @param <T>
* Wraps a {@link Cache} and a {@link SharedProcessor} to distribute events related to changes and syncs
*/
public class ProcessorStore<T> implements Store<T> {
public class ProcessorStore<T> implements SyncableStore<T> {

private Store<T> actualStore;
private Cache<T> cache;
private SharedProcessor<T> processor;
private volatile boolean populated;

public ProcessorStore(Store<T> actualStore, SharedProcessor<T> processor) {
this.actualStore = actualStore;
public ProcessorStore(Cache<T> cache, SharedProcessor<T> processor) {
this.cache = cache;
this.processor = processor;
}

Expand All @@ -41,58 +38,50 @@ public void add(T obj) {
}

@Override
public synchronized void update(T obj) {
Object oldObj = this.actualStore.get(obj);
public void update(T obj) {
Object oldObj = this.cache.put(obj);
if (oldObj != null) {
this.actualStore.update(obj);
this.processor.distribute(new ProcessorListener.UpdateNotification(oldObj, obj), false);
} else {
this.actualStore.add(obj);
this.processor.distribute(new ProcessorListener.AddNotification(obj), false);
}
}

@Override
public synchronized void delete(T obj) {
Object oldObj = this.actualStore.get(obj);
public void delete(T obj) {
Object oldObj = this.cache.remove(obj);
if (oldObj != null) {
this.actualStore.delete(obj);
this.processor.distribute(new ProcessorListener.DeleteNotification(obj, false), false);
}
}

@Override
public List<T> list() {
return actualStore.list();
return cache.list();
}

@Override
public List<String> listKeys() {
return actualStore.listKeys();
return cache.listKeys();
}

@Override
public Object get(T object) {
return actualStore.get(object);
public T get(T object) {
return cache.get(object);
}

@Override
public T getByKey(String key) {
return actualStore.getByKey(key);
return cache.getByKey(key);
}

@Override
public synchronized void replace(List<T> list, String resourceVersion) {
// it shouldn't happen, but it's possible for metaNamespaceKeyFunc to return null, so manually collect
Map<String, T> oldState = new HashMap<>();
actualStore.list().stream().forEach(old -> oldState.put(Cache.metaNamespaceKeyFunc(old), old));

actualStore.replace(list, resourceVersion);
populated = true;
public void replace(List<T> list) {
Map<String, T> oldState = cache.replace(list);

// now that the store is up-to-date, process the notifications
for (T newValue : list) {
T old = oldState.remove(Cache.metaNamespaceKeyFunc(newValue));
T old = oldState.remove(cache.getKey(newValue));
if (old == null) {
this.processor.distribute(new ProcessorListener.AddNotification(newValue), true);
} else {
Expand All @@ -106,18 +95,8 @@ public synchronized void replace(List<T> list, String resourceVersion) {

@Override
public void resync() {
this.actualStore.list()
this.cache.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<T>(i, i), true));
}

@Override
public void isPopulated(boolean isPopulated) {
this.populated = isPopulated;
}

@Override
public boolean hasSynced() {
return populated;
}

}
Loading

0 comments on commit 47310d7

Please sign in to comment.