Skip to content

Commit

Permalink
refactor: Refactored ProcessorListener and RefelectorRunnable
Browse files Browse the repository at this point in the history
Fixed license
  • Loading branch information
manusa authored and rohanKanojia committed Feb 25, 2020
1 parent 27c71e9 commit 66fa2ac
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.api.model.Status;

public interface Watcher<T> {

void eventReceived(Action action, T resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
Expand All @@ -26,10 +25,10 @@
* start a watch on a resource.
*
* @param <T> type
* @param <TList> list for that type
* @param <L> list for that type
*/
public interface ListerWatcher<T, TList> {
Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher) throws KubernetesClientException;
public interface ListerWatcher<T, L> {
Watch watch(ListOptions params, String namespace, OperationContext context, Watcher<T> watcher);

TList list(ListOptions params, String namespace, OperationContext context) throws KubernetesClientException;
L list(ListOptions params, String namespace, OperationContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResyncRunnable;
Expand All @@ -40,18 +39,11 @@
* This is taken from https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/Controller.java
* which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go
*/
public class Controller<T extends HasMetadata, TList extends KubernetesResourceList<T>> {
public class Controller<T extends HasMetadata, L extends KubernetesResourceList<T>> {
private static final Logger log = LoggerFactory.getLogger(Controller.class);

private static final long DEFAULT_PERIOD = 5000L;

/**
* Period controls the timing between one watch ending
* and the beginning of the next one in milliseconds.
* It's one second by default as done in go client
*/
private static final long DEFAULT_DELAY_PERIOD = 1000L;

/**
* resync fifo internals in millis
*/
Expand All @@ -62,9 +54,9 @@ public class Controller<T extends HasMetadata, TList extends KubernetesResourceL
*/
private DeltaFIFO<T> queue;

private ListerWatcher<T, TList> listerWatcher;
private ListerWatcher<T, L> listerWatcher;

private ReflectorRunnable<T, TList> reflector;
private Reflector<T, L> reflector;

private Supplier<Boolean> resyncFunc;

Expand All @@ -80,9 +72,7 @@ public class Controller<T extends HasMetadata, TList extends KubernetesResourceL

private Class<T> apiTypeClass;

private ScheduledFuture reflectorFuture;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, TList> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context) {
public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context) {
this.queue = queue;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
Expand All @@ -98,7 +88,7 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, TL
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, TList> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> popProcessFunc, OperationContext context) {
public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> popProcessFunc, OperationContext context) {
this(apiTypeClass, queue, listerWatcher, popProcessFunc, null, 0, context);
}

Expand All @@ -108,23 +98,21 @@ public void run() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable::run, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
}

synchronized (this) {
reflector = new ReflectorRunnable<T, TList>(apiTypeClass, listerWatcher, queue, operationContext);
try {
if (fullResyncPeriod > 0) {
reflectorFuture = reflectExecutor.scheduleWithFixedDelay(reflector::run, DEFAULT_DELAY_PERIOD, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
reflectorFuture = reflectExecutor.scheduleWithFixedDelay(reflector::run, DEFAULT_DELAY_PERIOD, DEFAULT_PERIOD, TimeUnit.MILLISECONDS);
}
} catch (RejectedExecutionException e) {
log.warn("reflector list-watching job exiting because the thread-pool is shutting down");
return;
try {
if (fullResyncPeriod > 0) {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
} else {
reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
}
reflector.watch();
} catch (RejectedExecutionException e) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", e);
return;
}

// Start the process loop
Expand All @@ -136,10 +124,7 @@ public void run() {
*/
public void stop() {
synchronized (this) {
if (reflectorFuture != null) {
reflector.stop();
reflectorFuture.cancel(true);
}
reflector.stop();
reflectExecutor.shutdown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -38,7 +39,7 @@ public class ProcessorListener<T> implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class);
private long resyncPeriodInMillis;
private ZonedDateTime nextResync;
private BlockingQueue<Notification> queue;
private BlockingQueue<Notification<T>> queue;
private ResourceEventHandler<T> handler;

public ProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodInMillis) {
Expand All @@ -52,48 +53,20 @@ public ProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodInMil
@Override
public void run() {
while (true) {
Boolean isValidNotification = Boolean.FALSE;
String operationType = "";
try {
Notification obj = queue.take();
if (obj instanceof UpdateNotification) {
isValidNotification = Boolean.TRUE;
operationType = "UPDATE";
UpdateNotification notification = (UpdateNotification) obj;
this.handler.onUpdate((T) notification.getOldObj(), (T) notification.getNewObj());
} else if (obj instanceof AddNotification) {
isValidNotification = Boolean.TRUE;
operationType = "ADD";
AddNotification notification = (AddNotification) obj;
this.handler.onAdd((T) notification.getNewObj());
} else if (obj instanceof DeleteNotification) {
isValidNotification = Boolean.TRUE;
operationType = "DELETE";
Object deletedObj = ((DeleteNotification) obj).getOldObj();
if (deletedObj instanceof DeltaFIFO.DeletedFinalStateUnknown) {
this.handler.onDelete(((DeltaFIFO.DeletedFinalStateUnknown<T>) deletedObj).getObj(), true);
} else {
this.handler.onDelete((T) deletedObj, false);
}
}
} catch (InterruptedException e) {
log.error("processor interrupted: {}", e.getMessage());
queue.take().handle(handler);
} catch(InterruptedException ex) {
log.error("Processor thread interrupted: {}", ex.getMessage());
Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
log.error("Failed invoking " + operationType + " event handler: {}", t.getMessage());
}

if (Boolean.FALSE.equals(isValidNotification)) {
throw new RuntimeException("Unrecognized notification.");
} catch (Exception ex) {
log.error("Failed invoking {} event handler: {}", ex.getMessage());
}
}
}

public void add(Notification<T> obj) {
if (obj == null) {
return;
}
this.queue.add(obj);
Optional.ofNullable(obj).ifPresent(this.queue::add);
}

public void determineNextResync(ZonedDateTime now) {
Expand All @@ -104,45 +77,60 @@ public boolean shouldResync(ZonedDateTime now) {
return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync));
}

public static class Notification<T> {}
public abstract static class Notification<T> {
private final T oldObject;
private final T newObject;

public static final class UpdateNotification<T> extends Notification<T> {
private T oldObj;
private T newObj;
public Notification(T oldObject, T newObject) {
this.oldObject = oldObject;
this.newObject = newObject;
}

public T getOldObject() {
return oldObject;
}

public UpdateNotification(T oldObj, T newObj) {
this.oldObj = oldObj;
this.newObj = newObj;
public T getNewObject() {
return newObject;
}

T getOldObj() {
return oldObj;
public abstract void handle(ResourceEventHandler<T> resourceEventHandler);
}

public static final class UpdateNotification<T> extends Notification<T> {
public UpdateNotification(T oldObject, T newObject) {
super(oldObject, newObject);
}

T getNewObj() {
return newObj;
@Override
public void handle(ResourceEventHandler<T> resourceEventHandler) {
resourceEventHandler.onUpdate(getOldObject(), getNewObject());
}
}

public static final class AddNotification<T> extends Notification<T> {
private T newObj;

public AddNotification(T newObj) {
this.newObj = newObj;
public AddNotification(T newObject) {
super(null, newObject);
}

T getNewObj() { return newObj; }
@Override
public void handle(ResourceEventHandler<T> resourceEventHandler) {
resourceEventHandler.onAdd(getNewObject());
}
}

public static final class DeleteNotification<T> extends Notification<T> {
private T oldObj;

public DeleteNotification(T oldObj) {
this.oldObj = oldObj;
public DeleteNotification(T oldObject) {
super(oldObject, null);
}

T getOldObj() {
return oldObj;
@Override
public void handle(ResourceEventHandler<T> resourceEventHandler) {
if (getOldObject() instanceof DeltaFIFO.DeletedFinalStateUnknown) {
resourceEventHandler.onDelete(((DeltaFIFO.DeletedFinalStateUnknown<T>) getOldObject()).getObj(), true);
} else {
resourceEventHandler.onDelete(getOldObject(), false);
}
}
}
}
Loading

0 comments on commit 66fa2ac

Please sign in to comment.