Skip to content

Commit

Permalink
Added SharedInformerEventListener in SharedInformerFactory for handli…
Browse files Browse the repository at this point in the history
…ng exceptions

SharedInformerFactory would have a method called addSharedInformerEventListener() which would recieve
an interface to handle failures.
  • Loading branch information
rohanKanojia committed Feb 26, 2020
1 parent 66fa2ac commit fa30d17
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers;

public interface SharedInformerEventListener {
void onException(Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

Expand All @@ -52,6 +53,8 @@ public class SharedInformerFactory extends BaseOperation {

private BaseOperation baseOperation;

private ConcurrentLinkedQueue<SharedInformerEventListener> eventListenerConcurrentLinkedQueue;

/**
* Constructor with thread pool specified.
*
Expand All @@ -65,6 +68,7 @@ public SharedInformerFactory(ExecutorService threadPool, OkHttpClient okHttpClie
this.informers = new HashMap<>();
this.startedInformers = new HashMap<>();
this.baseOperation = this.newInstance(context);
this.eventListenerConcurrentLinkedQueue = new ConcurrentLinkedQueue<>();
}

/**
Expand Down Expand Up @@ -113,7 +117,7 @@ public synchronized <T extends HasMetadata, TList extends KubernetesResourceList
*/
public synchronized <T extends HasMetadata, TList extends KubernetesResourceList<T>> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> apiTypeClass, Class<TList> apiListTypeClass, OperationContext operationContext, long resyncPeriodInMillis) {
ListerWatcher<T, TList> listerWatcher = listerWatcherFor(apiTypeClass, apiListTypeClass);
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<T, TList>(apiTypeClass, listerWatcher, resyncPeriodInMillis, operationContext);
SharedIndexInformer<T> informer = new DefaultSharedIndexInformer<T, TList>(apiTypeClass, listerWatcher, resyncPeriodInMillis, operationContext, eventListenerConcurrentLinkedQueue);
this.informers.put(apiTypeClass, informer);
return informer;
}
Expand Down Expand Up @@ -198,4 +202,8 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool)
informerExecutor.shutdown();
}
}

public synchronized void addSharedInformerEventListener(SharedInformerEventListener event) {
this.eventListenerConcurrentLinkedQueue.add(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResyncRunnable;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -70,16 +72,19 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private OperationContext operationContext;

private ConcurrentLinkedQueue<SharedInformerEventListener> eventListenerConcurrentLinkedQueue;

private Class<T> apiTypeClass;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context) {
public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListenerConcurrentLinkedQueue) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
this.fullResyncPeriod = fullResyncPeriod;
this.operationContext = context;
this.eventListenerConcurrentLinkedQueue = eventListenerConcurrentLinkedQueue;

// Starts one daemon thread for reflector
this.reflectExecutor = Executors.newSingleThreadScheduledExecutor();
Expand All @@ -88,10 +93,6 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L>
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> popProcessFunc, OperationContext context) {
this(apiTypeClass, queue, listerWatcher, popProcessFunc, null, 0, context);
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");

Expand All @@ -104,19 +105,19 @@ public void run() {
}

try {
if (fullResyncPeriod > 0) {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
} else {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
}
reflector.watch();
} catch (RejectedExecutionException e) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", e);
return;
if (fullResyncPeriod > 0) {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
} else {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
}
reflector.listAndWatch();

// Start the process loop
this.processLoop();
} catch (Exception exception) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception);
this.eventListenerConcurrentLinkedQueue.forEach(listener -> listener.onException(exception));
}

// Start the process loop
this.processLoop();
}

/**
Expand Down Expand Up @@ -151,15 +152,16 @@ public String lastSyncResourceVersion() {
/**
* drains the work queue.
*/
private void processLoop() {
private void processLoop() throws Exception {
while (true) {
try {
this.queue.pop(this.processFunc);
} catch (InterruptedException t) {
log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t);
return;
} catch (Throwable t) {
log.error("DefaultController#processLoop recovered from crashing {} ", t.getMessage(), t);
} catch (Exception e) {
log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private L getList() {
}
}

public void watch() {
public void listAndWatch() throws Exception {
try {
log.info("Started ReflectorRunnable watch for {}", apiTypeClass);
reListAndSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Controller;
import io.fabric8.kubernetes.client.informers.cache.DeltaFIFO;
Expand All @@ -34,6 +35,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;

public class DefaultSharedIndexInformer<T extends HasMetadata, L extends KubernetesResourceList<T>> implements SharedIndexInformer<T> {
Expand Down Expand Up @@ -61,7 +63,7 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
private volatile boolean started = false;
private volatile boolean stopped = false;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context) {
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListenerConcurrentLinkedQueue) {
this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;

Expand All @@ -70,7 +72,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis

DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer);

this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context);
this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListenerConcurrentLinkedQueue);
controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.examples.crds.Dummy;
import io.fabric8.kubernetes.examples.crds.DummyList;
Expand Down Expand Up @@ -64,6 +65,13 @@ public void onDelete(Dummy pod, boolean deletedFinalStateUnknown) {
}
);

sharedInformerFactory.addSharedInformerEventListener(new SharedInformerEventListener() {
@Override
public void onException(Exception exception) {
System.out.println("Exception occurred, but caught");
}
});

logger.info("Starting all registered informers");
sharedInformerFactory.startAllRegisteredInformers();

Expand Down

0 comments on commit fa30d17

Please sign in to comment.