Skip to content

Commit

Permalink
Update waitUntilCondition to use a watcher instead of polling. All wa…
Browse files Browse the repository at this point in the history
…itUntil* methods should end up calling waitUntilCondition. Retry all exceptions except HTTP_GONE, with backoff
  • Loading branch information
bbeaudreault committed May 19, 2020
1 parent c96b1db commit 3342279
Show file tree
Hide file tree
Showing 26 changed files with 804 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

/**
* Class for Default Kubernetes Client implementing KubernetesClient interface.
Expand Down Expand Up @@ -180,13 +181,13 @@ public ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasM

@Override
public NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable<HasMetadata, Boolean> resource(HasMetadata item) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), item, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), item, -1, DeletionPropagation.BACKGROUND, true, 5, 2) {
};
}

@Override
public NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable<HasMetadata, Boolean> resource(String s) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), s, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), s, -1, DeletionPropagation.BACKGROUND, true, 5, 2) {
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.fabric8.kubernetes.client;

import java.util.concurrent.TimeUnit;

public interface WaitRetryBackoffConfigurable<T> {

/**
* Configure the backoff strategy to use when waiting for conditions, in case the watcher encounters a retryable error.
* @param initialBackoff the value for the initial backoff on first error
* @param backoffUnit the TimeUnit for the initial backoff value
* @param backoffMultiplier what to multiply the backoff by on each subsequent error
* @return
*/
T withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
public interface Resource<T, D> extends CreateOrReplaceable<T, T, D>, CreateFromServerGettable<T, T, D>,
CascadingEditReplacePatchDeletable<T, T, D, Boolean>,
VersionWatchable<Watch, Watcher<T>>,
Waitable<T, T>, Requirable<T>, Readiable {
WatchingWaitableWithBackoff<T, T>, Requirable<T>, Readiable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface VisitFromServerGetWatchDeleteRecreateWaitApplicable<T, B> exten
FromServerGettable<T>, RecreateApplicable<T, T>,
CascadingDeletable<B>,
Watchable<Watch, Watcher<T>>,
Waitable<T, T>,
WatchingWaitableWithBackoff<T, T>,
GracePeriodConfigurable<CascadingDeletable<B>>,
PropagationPolicyConfigurable<CascadingDeletable<B>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.WaitRetryBackoffConfigurable;

public interface WatchingWaitableWithBackoff<T, P> extends Waitable<T, P>, WaitRetryBackoffConfigurable<WatchingWaitableWithBackoff<T, P>> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.fabric8.kubernetes.client.dsl.base;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.builder.Function;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Doneable;
Expand Down Expand Up @@ -43,6 +46,7 @@
import io.fabric8.kubernetes.client.dsl.Replaceable;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.Watchable;
import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException;
import io.fabric8.kubernetes.client.dsl.internal.DefaultOperationInfo;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
Expand All @@ -66,17 +70,20 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import okhttp3.HttpUrl;
import okhttp3.Request;

public class BaseOperation<T, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
extends OperationSupport
implements
OperationInfo,
MixedOperation<T, L, D, R>,
Resource<T,D> {

private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class);

private final Boolean cascading;
private final T item;

Expand All @@ -92,6 +99,8 @@ public class BaseOperation<T, L extends KubernetesResourceList, D extends Doneab
private final Boolean reloadingFromServer;
private final long gracePeriodSeconds;
private final DeletionPropagation propagationPolicy;
private final long watchRetryInitialBackoffMillis;
private final double watchRetryBackoffMultiplier;

protected String apiVersion;

Expand All @@ -113,6 +122,8 @@ protected BaseOperation(OperationContext ctx) {
this.labelsNotIn = ctx.getLabelsNotIn();
this.fields = ctx.getFields();
this.fieldsNot = ctx.getFieldsNot();
this.watchRetryInitialBackoffMillis = ctx.getWatchRetryInitialBackoffMillis();
this.watchRetryBackoffMultiplier = ctx.getWatchRetryBackoffMultiplier();
}

/**
Expand Down Expand Up @@ -991,6 +1002,11 @@ public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withPropagatio
return newInstance(context.withPropagationPolicy(propagationPolicy));
}

@Override
public BaseOperation<T, L, D, R> withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier) {
return newInstance(context.withWatchRetryInitialBackoffMillis(backoffUnit.toMillis(initialBackoff)).withWatchRetryBackoffMultiplier(backoffMultiplier));
}

protected Class<? extends Config> getConfigType() {
return Config.class;
}
Expand Down Expand Up @@ -1068,61 +1084,51 @@ public Boolean isReady() {
return i instanceof HasMetadata && Readiness.isReady((HasMetadata)i);
}

protected T waitUntilExists(long amount, TimeUnit timeUnit) throws InterruptedException {
return waitUntilCondition(Objects::nonNull, amount, timeUnit);
}

@Override
public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException {

long timeoutInNanos = timeUnit.toNanos(amount);
long end = System.nanoTime() + timeoutInNanos;

while (System.nanoTime() < end) {
T item = fromServer().get();
try {
if (Readiness.isReady((HasMetadata) item)) {
return item;
}

Thread.sleep(500);
} catch (IllegalArgumentException illegalArgumentException) {
// This might be thrown if Resource passed doesn't comply with concept of "readiness"
throw illegalArgumentException;
}
}

T item = fromServer().get();
if (Readiness.isReady((HasMetadata) item)) {
return item;
}

throw new IllegalStateException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] not ready!");
return waitUntilCondition(resource -> Objects.nonNull(resource) && Readiness.isReady(resource), amount, timeUnit);
}

@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
throws InterruptedException {
return waitUntilConditionWithRetries(condition, timeUnit.toNanos(amount), watchRetryInitialBackoffMillis);
}

long timeoutInMillis = timeUnit.toNanos(amount);

long end = System.nanoTime() + timeoutInMillis;
while (System.nanoTime() < end) {
T item = get();
if (condition.test(item)) {
return item;
}

// in the future, this should probably be more intelligent
Thread.sleep(500);
}
private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNanos, long backoffMillis)
throws InterruptedException {

T item = get();
T item = fromServer().get();
if (condition.test(item)) {
return item;
}

throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] not found!");
long end = System.nanoTime() + timeoutNanos;

WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition);
Watch watch = item == null
? watch(null, watcher)
: watch(item.getMetadata().getResourceVersion(), watcher);

try {
return watcher.getFuture()
.get(timeoutNanos, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof WatchException && ((WatchException) e.getCause()).isShouldRetry()) {
watch.close();
LOG.debug("retryable watch exception encountered, retrying after {} millis", backoffMillis, e.getCause());
Thread.sleep(backoffMillis);
long newTimeout = end - System.nanoTime();
long newBackoff = (long) (backoffMillis * watchRetryBackoffMultiplier);
return waitUntilConditionWithRetries(condition, newTimeout, newBackoff);
}
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
LOG.debug("ran out of time waiting for watcher, wait condition not met");
throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!");
} finally {
watch.close();
}
}

public void setType(Class<T> type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@

package io.fabric8.kubernetes.client.dsl.base;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;

import io.fabric8.kubernetes.api.builder.Function;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Doneable;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ReplicationController;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.fabric8.kubernetes.client.internal.readiness.ReadinessWatcher;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;

public class HasMetadataOperation<T extends HasMetadata, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
extends BaseOperation< T, L, D, R> {
Expand Down Expand Up @@ -157,53 +153,4 @@ public T patch(T item) {
}
throw KubernetesClientException.launderThrowable(forOperationType("patch"), caught);
}

/**
* A wait method that combines watching and polling.
* The need for that is that in some cases a pure watcher approach consistently fails.
* @param i The number of iterations to perform.
* @param started Time in milliseconds where the watch started.
* @param interval The amount of time in millis to wait on each iteration.
* @param amount The maximum amount in millis of time since started to wait.
* @return The {@link ReplicationController} if ready.
*/
protected T periodicWatchUntilReady(int i, long started, long interval, long amount) {
T item = fromServer().get();
if (Readiness.isReady(item)) {
return item;
}

ReadinessWatcher<T> watcher = new ReadinessWatcher<>(item);
try (Watch watch = watch(item.getMetadata().getResourceVersion(), watcher)) {
try {
return watcher.await(interval, TimeUnit.NANOSECONDS);
} catch (KubernetesClientTimeoutException e) {
if (i <= 0) {
throw e;
}
}

long remaining = (started + amount) - System.nanoTime();
long next = Math.max(0, Math.min(remaining, interval));
return periodicWatchUntilReady(i - 1, started, next, amount);
}
}

@Override
public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException {
if (Readiness.isReadinessApplicable(getType())) {
long started = System.nanoTime();
waitUntilExists(amount, timeUnit);
long alreadySpent = System.nanoTime() - started;

long remaining = timeUnit.toNanos(amount) - alreadySpent;
if (remaining <= 0) {
return periodicWatchUntilReady(0, System.nanoTime(), 0, 0);
}

return periodicWatchUntilReady(10, System.nanoTime(), Math.max(remaining / 10, 1000000000L), remaining);
}

return super.waitUntilReady(amount, timeUnit);
}
}
Loading

0 comments on commit 3342279

Please sign in to comment.