diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java index 2f6a074e92b..88337a8dcc5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/Watcher.java @@ -15,8 +15,6 @@ */ package io.fabric8.kubernetes.client; -import io.fabric8.kubernetes.api.model.Status; - public interface Watcher { void eventReceived(Action action, T resource); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java index ebfc0ddc486..763cca33c32 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ListerWatcher.java @@ -16,7 +16,6 @@ package io.fabric8.kubernetes.client.informers; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.base.OperationContext; @@ -26,10 +25,10 @@ * start a watch on a resource. * * @param type - * @param list for that type + * @param list for that type */ -public interface ListerWatcher { - Watch watch(ListOptions params, String namespace, OperationContext context, Watcher watcher) throws KubernetesClientException; +public interface ListerWatcher { + Watch watch(ListOptions params, String namespace, OperationContext context, Watcher watcher); - TList list(ListOptions params, String namespace, OperationContext context) throws KubernetesClientException; + L list(ListOptions params, String namespace, OperationContext context); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java index d21bccd0a1a..226d5ec59b2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java @@ -17,7 +17,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.ResyncRunnable; @@ -40,18 +39,11 @@ * This is taken from https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/Controller.java * which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go */ -public class Controller> { +public class Controller> { private static final Logger log = LoggerFactory.getLogger(Controller.class); private static final long DEFAULT_PERIOD = 5000L; - /** - * Period controls the timing between one watch ending - * and the beginning of the next one in milliseconds. - * It's one second by default as done in go client - */ - private static final long DEFAULT_DELAY_PERIOD = 1000L; - /** * resync fifo internals in millis */ @@ -62,9 +54,9 @@ public class Controller queue; - private ListerWatcher listerWatcher; + private ListerWatcher listerWatcher; - private ReflectorRunnable reflector; + private Reflector reflector; private Supplier resyncFunc; @@ -80,9 +72,7 @@ public class Controller apiTypeClass; - private ScheduledFuture reflectorFuture; - - public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context) { + public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context) { this.queue = queue; this.listerWatcher = listerWatcher; this.apiTypeClass = apiTypeClass; @@ -98,7 +88,7 @@ public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> popProcessFunc, OperationContext context) { + public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> popProcessFunc, OperationContext context) { this(apiTypeClass, queue, listerWatcher, popProcessFunc, null, 0, context); } @@ -108,23 +98,21 @@ public void run() { // Start the resync runnable if (fullResyncPeriod > 0) { ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); - resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable::run, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); + resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); } else { log.info("informer#Controller: resync skipped due to 0 full resync period"); } - synchronized (this) { - reflector = new ReflectorRunnable(apiTypeClass, listerWatcher, queue, operationContext); - try { - if (fullResyncPeriod > 0) { - reflectorFuture = reflectExecutor.scheduleWithFixedDelay(reflector::run, DEFAULT_DELAY_PERIOD, fullResyncPeriod, TimeUnit.MILLISECONDS); - } else { - reflectorFuture = reflectExecutor.scheduleWithFixedDelay(reflector::run, DEFAULT_DELAY_PERIOD, DEFAULT_PERIOD, TimeUnit.MILLISECONDS); - } - } catch (RejectedExecutionException e) { - log.warn("reflector list-watching job exiting because the thread-pool is shutting down"); - return; + try { + if (fullResyncPeriod > 0) { + reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod); + } else { + reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD); } + reflector.watch(); + } catch (RejectedExecutionException e) { + log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", e); + return; } // Start the process loop @@ -136,10 +124,7 @@ public void run() { */ public void stop() { synchronized (this) { - if (reflectorFuture != null) { - reflector.stop(); - reflectorFuture.cancel(true); - } + reflector.stop(); reflectExecutor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java index 35569025769..e3e150d1069 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java @@ -21,6 +21,7 @@ import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -38,7 +39,7 @@ public class ProcessorListener implements Runnable { private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class); private long resyncPeriodInMillis; private ZonedDateTime nextResync; - private BlockingQueue queue; + private BlockingQueue> queue; private ResourceEventHandler handler; public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMillis) { @@ -52,48 +53,20 @@ public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMil @Override public void run() { while (true) { - Boolean isValidNotification = Boolean.FALSE; - String operationType = ""; try { - Notification obj = queue.take(); - if (obj instanceof UpdateNotification) { - isValidNotification = Boolean.TRUE; - operationType = "UPDATE"; - UpdateNotification notification = (UpdateNotification) obj; - this.handler.onUpdate((T) notification.getOldObj(), (T) notification.getNewObj()); - } else if (obj instanceof AddNotification) { - isValidNotification = Boolean.TRUE; - operationType = "ADD"; - AddNotification notification = (AddNotification) obj; - this.handler.onAdd((T) notification.getNewObj()); - } else if (obj instanceof DeleteNotification) { - isValidNotification = Boolean.TRUE; - operationType = "DELETE"; - Object deletedObj = ((DeleteNotification) obj).getOldObj(); - if (deletedObj instanceof DeltaFIFO.DeletedFinalStateUnknown) { - this.handler.onDelete(((DeltaFIFO.DeletedFinalStateUnknown) deletedObj).getObj(), true); - } else { - this.handler.onDelete((T) deletedObj, false); - } - } - } catch (InterruptedException e) { - log.error("processor interrupted: {}", e.getMessage()); + queue.take().handle(handler); + } catch(InterruptedException ex) { + log.error("Processor thread interrupted: {}", ex.getMessage()); + Thread.currentThread().interrupt(); return; - } catch (Throwable t) { - log.error("Failed invoking " + operationType + " event handler: {}", t.getMessage()); - } - - if (Boolean.FALSE.equals(isValidNotification)) { - throw new RuntimeException("Unrecognized notification."); + } catch (Exception ex) { + log.error("Failed invoking {} event handler: {}", ex.getMessage()); } } } public void add(Notification obj) { - if (obj == null) { - return; - } - this.queue.add(obj); + Optional.ofNullable(obj).ifPresent(this.queue::add); } public void determineNextResync(ZonedDateTime now) { @@ -104,45 +77,60 @@ public boolean shouldResync(ZonedDateTime now) { return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync)); } - public static class Notification {} + public abstract static class Notification { + private final T oldObject; + private final T newObject; - public static final class UpdateNotification extends Notification { - private T oldObj; - private T newObj; + public Notification(T oldObject, T newObject) { + this.oldObject = oldObject; + this.newObject = newObject; + } + + public T getOldObject() { + return oldObject; + } - public UpdateNotification(T oldObj, T newObj) { - this.oldObj = oldObj; - this.newObj = newObj; + public T getNewObject() { + return newObject; } - T getOldObj() { - return oldObj; + public abstract void handle(ResourceEventHandler resourceEventHandler); + } + + public static final class UpdateNotification extends Notification { + public UpdateNotification(T oldObject, T newObject) { + super(oldObject, newObject); } - T getNewObj() { - return newObj; + @Override + public void handle(ResourceEventHandler resourceEventHandler) { + resourceEventHandler.onUpdate(getOldObject(), getNewObject()); } } public static final class AddNotification extends Notification { - private T newObj; - - public AddNotification(T newObj) { - this.newObj = newObj; + public AddNotification(T newObject) { + super(null, newObject); } - T getNewObj() { return newObj; } + @Override + public void handle(ResourceEventHandler resourceEventHandler) { + resourceEventHandler.onAdd(getNewObject()); + } } public static final class DeleteNotification extends Notification { - private T oldObj; - - public DeleteNotification(T oldObj) { - this.oldObj = oldObj; + public DeleteNotification(T oldObject) { + super(oldObject, null); } - T getOldObj() { - return oldObj; + @Override + public void handle(ResourceEventHandler resourceEventHandler) { + if (getOldObject() instanceof DeltaFIFO.DeletedFinalStateUnknown) { + resourceEventHandler.onDelete(((DeltaFIFO.DeletedFinalStateUnknown) getOldObject()).getObj(), true); + } else { + resourceEventHandler.onDelete(getOldObject(), false); + } } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java new file mode 100644 index 00000000000..10e05662206 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.api.model.ListOptionsBuilder; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.dsl.base.OperationContext; +import io.fabric8.kubernetes.client.informers.ListerWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class Reflector> { + + private static final Logger log = LoggerFactory.getLogger(Reflector.class); + private static final Long WATCH_RESTART_DELAY_MILLIS = 5000L; + + private final AtomicReference lastSyncResourceVersion; + private final Class apiTypeClass; + private final ListerWatcher listerWatcher; + private final Store store; + private final OperationContext operationContext; + private final long resyncPeriodMillis; + private final ScheduledExecutorService resyncExecutor; + private final ReflectorWatcher watcher; + private final AtomicBoolean isActive; + private final AtomicBoolean isWatcherStarted; + private final AtomicReference watch; + + public Reflector(Class apiTypeClass, ListerWatcher listerWatcher, Store store, OperationContext operationContext, long resyncPeriodMillis) { + this.apiTypeClass = apiTypeClass; + this.listerWatcher = listerWatcher; + this.store = store; + this.operationContext = operationContext; + this.resyncPeriodMillis = resyncPeriodMillis; + lastSyncResourceVersion = new AtomicReference<>(); + resyncExecutor = Executors.newSingleThreadScheduledExecutor(); + watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync); + isActive = new AtomicBoolean(true); + isWatcherStarted = new AtomicBoolean(false); + watch = new AtomicReference<>(null); + } + + private L getList() { + return listerWatcher.list(new ListOptionsBuilder() + .withWatch(Boolean.FALSE) + .withResourceVersion(null) + .withTimeoutSeconds(null).build(), null, operationContext); + } + + public void watch() { + try { + log.info("Started ReflectorRunnable watch for {}", apiTypeClass); + reListAndSync(); + resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS); + startWatcher(); + } catch (Exception exception) { + store.isPopulated(false); + throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception); + } + } + + public void stop() { + isActive.set(false); + } + + private void reListAndSync() { + final L list = getList(); + final String latestResourceVersion = list.getMetadata().getResourceVersion(); + log.debug("Listing items ({}) for resource {} v{}", list.getItems().size(), apiTypeClass, latestResourceVersion); + lastSyncResourceVersion.set(latestResourceVersion); + store.replace(list.getItems(), latestResourceVersion); + if (!isActive.get()) { + resyncExecutor.shutdown(); + } + } + + private void startWatcher() { + log.debug("Starting watcher for resource {} v{}", apiTypeClass, lastSyncResourceVersion.get()); + if (watch.get() != null) { + log.debug("Stopping previous watcher"); + watch.get().close(); + } + if (isWatcherStarted.get()) { + log.debug("Watcher already started, delaying execution of new watcher"); + try { + Thread.sleep(WATCH_RESTART_DELAY_MILLIS); + } catch (InterruptedException e) { + log.error("Reflector thread was interrupted"); + Thread.currentThread().interrupt(); + return; + } + } + if (isActive.get()) { + isWatcherStarted.set(true); + watch.set( + listerWatcher.watch(new ListOptionsBuilder() + .withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(), + null, operationContext, watcher) + ); + } + } + + public String getLastSyncResourceVersion() { + return lastSyncResourceVersion.get(); + } +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorRunnable.java deleted file mode 100644 index 6baee3fbdc0..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorRunnable.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.api.model.ListMeta; -import io.fabric8.kubernetes.api.model.ListOptionsBuilder; -import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; -import io.fabric8.kubernetes.client.informers.ListerWatcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.HttpURLConnection; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -public class ReflectorRunnable> implements Runnable { - private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class); - - private String lastSyncResourceVersion; - private Watch watch; - private ListerWatcher listerWatcher; - private Store store; - private Class apiTypeClass; - private AtomicBoolean isActive = new AtomicBoolean(true); - private OperationContext operationContext; - // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion - // failed with an HTTP 410 (Gone) status code. - private boolean isLastSyncResourceVersionGone = false; - - public ReflectorRunnable(Class apiTypeClass, ListerWatcher listerWatcher, Store store, OperationContext operationContext) { - this.listerWatcher = listerWatcher; - this.store = store; - this.apiTypeClass = apiTypeClass; - this.operationContext = operationContext; - } - - /** - * Run first lists all items and get the resource version at the moment of call - * and then use the resource version to watch. - */ - public void run() { - try { - Thread.currentThread().setName("Reflector-" + apiTypeClass.getSimpleName()); - log.info("{}#Start listing and watching...", apiTypeClass); - - TList list = listerWatcher.list(new ListOptionsBuilder().withWatch(Boolean.FALSE).withResourceVersion(null).withTimeoutSeconds(null).build(), null, operationContext); - - ListMeta listMeta = list.getMetadata(); - String resourceVersion = listMeta.getResourceVersion(); - List items = list.getItems(); - - if (log.isDebugEnabled()) { - log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion); - } - this.syncWith(items, resourceVersion); - this.lastSyncResourceVersion = resourceVersion; - this.isLastSyncResourceVersionGone = false; - - if (log.isDebugEnabled()) { - log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion); - } - - if (!isActive.get()) { - if (watch != null) { - log.info("Closing watch"); - watch.close(); - return; - } - } - if (watch != null) { - log.info("Closing existing watch and waiting"); - watch.close(); - } - try { - // Use resource version to watch - watch = listerWatcher.watch(new ListOptionsBuilder().withWatch(Boolean.TRUE).withResourceVersion(resourceVersion).withTimeoutSeconds(null).build(), - null, operationContext, new Watcher() { - @Override - public void eventReceived(Action action, T resource) { - log.info("Event received ", action.name()); - if (action == null) { - log.error("unrecognized event {}", resource); - } - if (action == Action.ERROR) { - String errorMessage = String.format("got ERROR event for ", resource.getMetadata().getName()); - log.error(errorMessage); - throw new RuntimeException(errorMessage); - } - - ObjectMeta meta = resource.getMetadata(); - String newResourceVersion = meta.getResourceVersion(); - switch (action) { - case ADDED: - store.add(resource); - break; - case MODIFIED: - store.update(resource); - break; - case DELETED: - store.delete(resource); - break; - } - lastSyncResourceVersion = newResourceVersion; - if (log.isDebugEnabled()) { - log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion); - } - } - - @Override - public void onClose(KubernetesClientException exception) { - log.error("Watch closing."); - if (exception != null) { - log.error("watch closed due to " + exception.getMessage()); - // Relist when HTTP_GONE is received - Status returnStatus = exception.getStatus(); - if (returnStatus.getCode().equals(HttpURLConnection.HTTP_GONE)) { - isLastSyncResourceVersionGone = true; - log.info("410(HTTP_GONE) recieved, initiating re-list and re-watch"); - run(); - } else { - log.debug("exception received during watch", exception); - } - } - } - }); - } catch (Throwable t) { - log.info("{}#Watch connection got exception {}", apiTypeClass, t.getMessage()); - } - } catch (Exception exception) { - log.error("Failure in list-watch: {}, cause: {}", exception.getMessage(), exception.getCause()); - log.debug("exception in listing and watching", exception); - // Update store sync status to false - store.isPopulated(false); - } - } - - public void stop() { - isActive.set(false); - } - - private void syncWith(List items, String resourceVersion) { - this.store.replace(items, resourceVersion); - } - - public String getLastSyncResourceVersion() { - return lastSyncResourceVersion; - } - - public boolean isLastSyncResourceVersionGone() { - return isLastSyncResourceVersionGone; - } - -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java new file mode 100644 index 00000000000..a1c4f700ee8 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ReflectorWatcher.java @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Status; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +public class ReflectorWatcher implements Watcher { + + private static final Logger log = LoggerFactory.getLogger(ReflectorWatcher.class); + + private final Store store; + private final AtomicReference lastSyncResourceVersion; + private final Runnable onClose; + private final Runnable onHttpGone; + + public ReflectorWatcher(Store store, AtomicReference lastSyncResourceVersion, Runnable onClose, Runnable onHttpGone) { + this.store = store; + this.lastSyncResourceVersion = lastSyncResourceVersion; + this.onClose = onClose; + this.onHttpGone = onHttpGone; + } + + @Override + public void eventReceived(Action action, T resource) { + if (action == null) { + final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName()); + log.error(errorMessage); + throw new KubernetesClientException(errorMessage); + } + log.info("Event received {}", action.name()); + switch (action) { + case ERROR: + final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName()); + log.error(errorMessage); + throw new KubernetesClientException(errorMessage); + case ADDED: + store.add(resource); + break; + case MODIFIED: + store.update(resource); + break; + case DELETED: + store.delete(resource); + break; + } + lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion()); + log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get()); + } + + @Override + public void onClose(KubernetesClientException exception) { + log.error("Watch closing"); + Optional.ofNullable(exception) + .map(e -> { + log.debug("Exception received during watch", e); + return exception; + }) + .map(KubernetesClientException::getStatus) + .map(Status::getCode) + .filter(c -> c.equals(HttpURLConnection.HTTP_GONE)) + .ifPresent(c -> onHttpGone.run()); + onClose.run(); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java index 46477fcd4b2..261ea2d30af 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java @@ -92,7 +92,7 @@ public void run() { if (listeners == null || listeners.isEmpty()) { return; } - for (ProcessorListener listener : listeners) { + for (ProcessorListener listener : listeners) { executorService.submit(listener); } } finally { @@ -130,7 +130,7 @@ public boolean shouldResync() { this.syncingListeners = new ArrayList<>(); ZonedDateTime now = ZonedDateTime.now(); - for (ProcessorListener listener : this.listeners) { + for (ProcessorListener listener : this.listeners) { if (listener.shouldResync(now)) { resyncNeeded = true; this.syncingListeners.add(listener); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index cc09b5a35ca..44833b47ea5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -36,7 +36,7 @@ import java.util.Map; import java.util.function.Function; -public class DefaultSharedIndexInformer> implements SharedIndexInformer { +public class DefaultSharedIndexInformer> implements SharedIndexInformer { private static final Logger log = LoggerFactory.getLogger(DefaultSharedIndexInformer.class); private static final long MINIMUM_RESYNC_PERIOD_MILLIS = 1000L; @@ -54,23 +54,23 @@ public class DefaultSharedIndexInformer processor; - private Controller controller; + private Controller controller; private Thread controllerThread; private volatile boolean started = false; private volatile boolean stopped = false; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context) { + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context) { this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; this.processor = new SharedProcessor<>(); this.indexer = new Cache(); - DeltaFIFO fifo = new DeltaFIFO(Cache::metaNamespaceKeyFunc, this.indexer); + DeltaFIFO fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer); - this.controller = new Controller(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context); + this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context); controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java index a8f3a7997d5..7418d304d26 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java @@ -30,7 +30,7 @@ public class DeltaFIFOTest { public void testBasic() throws InterruptedException { Deque> receivingDeltas = new LinkedList<>(); Pod foo1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("default").endMetadata().build(); - Cache cache = new Cache(); + Cache cache = new Cache<>(); DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, cache); AbstractMap.SimpleEntry receivingDelta; @@ -91,7 +91,7 @@ public void testBasic() throws InterruptedException { @Test public void testDeduplication() { Pod foo1 = new PodBuilder().withNewMetadata().withName("foo1").withNamespace("default").endMetadata().build(); - Cache cache = new Cache(); + Cache cache = new Cache<>(); DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, cache); Deque> deltas; @@ -139,7 +139,7 @@ public void testReplaceWithDeleteDeltaIn() throws InterruptedException { Pod oldPod = new PodBuilder().withNewMetadata().withNamespace("default").withName("foo1").endMetadata().build(); Pod newPod = new PodBuilder().withNewMetadata().withNamespace("default").withName("foo2").endMetadata().build(); - Cache mockCache = mock(Cache.class); + Cache mockCache = mock(Cache.class); doReturn(oldPod).when(mockCache).getByKey(Cache.deletionHandlingMetaNamespaceKeyFunc(oldPod)); DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, mockCache); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/InformerExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/InformerExample.java index 8d13aa2008e..0e6d052a316 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/InformerExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/InformerExample.java @@ -27,39 +27,38 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; public class InformerExample { private static Logger logger = LoggerFactory.getLogger(InformerExample.class); - public static void main(String args[]) throws IOException, InterruptedException { + public static void main(String[] args) throws InterruptedException { try (final KubernetesClient client = new DefaultKubernetesClient()) { SharedInformerFactory sharedInformerFactory = client.informers(); - SharedIndexInformer podInformer = sharedInformerFactory.sharedIndexInformerFor(Pod.class, PodList.class, 15 * 60 * 1000); - log("Informer factory initialized."); + SharedIndexInformer podInformer = sharedInformerFactory.sharedIndexInformerFor(Pod.class, PodList.class, 30 * 1000L); + logger.info("Informer factory initialized."); podInformer.addEventHandler( new ResourceEventHandler() { @Override public void onAdd(Pod pod) { - log(pod.getMetadata().getName() + " pod added\n"); + logger.info("{} pod added", pod.getMetadata().getName()); } @Override public void onUpdate(Pod oldPod, Pod newPod) { - log(oldPod.getMetadata().getName() + " pod updated\n"); + logger.info("{} pod updated", oldPod.getMetadata().getName()); } @Override public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { - log(pod.getMetadata().getName() + " pod deleted \n"); + logger.info("{} pod deleted", pod.getMetadata().getName()); } } ); - log("Starting all registered informers"); + logger.info("Starting all registered informers"); sharedInformerFactory.startAllRegisteredInformers(); Pod testPod = new PodBuilder() .withNewMetadata().withName("myapp-pod").withLabels(Collections.singletonMap("app", "myapp-pod")).endMetadata() @@ -78,29 +77,21 @@ public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { .build(); client.pods().inNamespace("default").create(testPod); - log("Pod created"); + logger.info("Pod created"); Thread.sleep(3000L); Lister podLister = new Lister<> (podInformer.getIndexer(), "default"); Pod myPod = podLister.get("myapp-pod"); - log("PodLister has " + podLister.list().size()); + logger.info("PodLister has {}", podLister.list().size()); if (myPod != null) { - log("***** myapp-pod created %s", myPod.getMetadata().getCreationTimestamp()); + logger.info("***** myapp-pod created {}", myPod.getMetadata().getCreationTimestamp()); } // Wait for some time now - TimeUnit.MINUTES.sleep(15); + TimeUnit.MINUTES.sleep(3); sharedInformerFactory.stopAllRegisteredInformers(); } } - - private static void log(String action, Object obj) { - logger.info("{}: {}", action, obj); - } - - private static void log(String action) { - logger.info(action); - } }