Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update waitUntilCondition to use a watcher instead of polling. #2239

Merged
merged 5 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasM

@Override
public NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> resourceList(KubernetesResourceList item) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), item, null, null, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<>(), item, null, DeletionPropagation.BACKGROUND, true) {
};
}

Expand All @@ -175,20 +175,20 @@ public NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata,

@Override
public ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> resourceList(String s) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), s, null, null, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<>(), s, null, DeletionPropagation.BACKGROUND, true) {
};
}


@Override
public NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable<HasMetadata, Boolean> resource(HasMetadata item) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), item, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), item, -1, DeletionPropagation.BACKGROUND, true, Waitable.DEFAULT_INITIAL_BACKOFF_MILLIS, Waitable.DEFAULT_BACKOFF_MULTIPLIER) {
};
}

@Override
public NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable<HasMetadata, Boolean> resource(String s) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), s, -1, DeletionPropagation.BACKGROUND, true) {
return new NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl(httpClient, getConfiguration(), getNamespace(), null, false, false, new ArrayList<Visitor>(), s, -1, DeletionPropagation.BACKGROUND, true, Waitable.DEFAULT_INITIAL_BACKOFF_MILLIS, Waitable.DEFAULT_BACKOFF_MULTIPLIER) {
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@

public interface Waitable<T, P> {

T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException;
long DEFAULT_INITIAL_BACKOFF_MILLIS = 5L;
double DEFAULT_BACKOFF_MULTIPLIER = 2d;

T waitUntilCondition(Predicate<P> condition, long amount, TimeUnit timeUnit) throws InterruptedException;
T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException;

T waitUntilCondition(Predicate<P> condition, long amount, TimeUnit timeUnit) throws InterruptedException;

/**
* Configure the backoff strategy to use when waiting for conditions, in case the watcher encounters a retryable error.
* @param initialBackoff the value for the initial backoff on first error
* @param backoffUnit the TimeUnit for the initial backoff value
* @param backoffMultiplier what to multiply the backoff by on each subsequent error
* @return
*/
Waitable<T, P> withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.fabric8.kubernetes.client.dsl.base;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.builder.Function;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Doneable;
Expand Down Expand Up @@ -43,6 +46,7 @@
import io.fabric8.kubernetes.client.dsl.Replaceable;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.Watchable;
import io.fabric8.kubernetes.client.dsl.base.WaitForConditionWatcher.WatchException;
import io.fabric8.kubernetes.client.dsl.internal.DefaultOperationInfo;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager;
Expand All @@ -67,17 +71,20 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import okhttp3.HttpUrl;
import okhttp3.Request;

public class BaseOperation<T, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
extends OperationSupport
implements
OperationInfo,
MixedOperation<T, L, D, R>,
Resource<T,D> {

private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class);

private final Boolean cascading;
private final T item;

Expand All @@ -93,6 +100,8 @@ public class BaseOperation<T, L extends KubernetesResourceList, D extends Doneab
private final Boolean reloadingFromServer;
private final long gracePeriodSeconds;
private final DeletionPropagation propagationPolicy;
private final long watchRetryInitialBackoffMillis;
private final double watchRetryBackoffMultiplier;

protected String apiVersion;

Expand All @@ -114,6 +123,8 @@ protected BaseOperation(OperationContext ctx) {
this.labelsNotIn = ctx.getLabelsNotIn();
this.fields = ctx.getFields();
this.fieldsNot = ctx.getFieldsNot();
this.watchRetryInitialBackoffMillis = ctx.getWatchRetryInitialBackoffMillis();
this.watchRetryBackoffMultiplier = ctx.getWatchRetryBackoffMultiplier();
}

/**
Expand Down Expand Up @@ -976,6 +987,11 @@ public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withPropagatio
return newInstance(context.withPropagationPolicy(propagationPolicy));
}

@Override
public BaseOperation<T, L, D, R> withWaitRetryBackoff(long initialBackoff, TimeUnit backoffUnit, double backoffMultiplier) {
return newInstance(context.withWatchRetryInitialBackoffMillis(backoffUnit.toMillis(initialBackoff)).withWatchRetryBackoffMultiplier(backoffMultiplier));
}

protected Class<? extends Config> getConfigType() {
return Config.class;
}
Expand Down Expand Up @@ -1053,61 +1069,53 @@ public Boolean isReady() {
return i instanceof HasMetadata && Readiness.isReady((HasMetadata)i);
}

protected T waitUntilExists(long amount, TimeUnit timeUnit) throws InterruptedException {
return waitUntilCondition(Objects::nonNull, amount, timeUnit);
}

@Override
public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException {

long timeoutInNanos = timeUnit.toNanos(amount);
long end = System.nanoTime() + timeoutInNanos;

while (System.nanoTime() < end) {
T item = fromServer().get();
try {
if (Readiness.isReady((HasMetadata) item)) {
return item;
}

Thread.sleep(500);
} catch (IllegalArgumentException illegalArgumentException) {
// This might be thrown if Resource passed doesn't comply with concept of "readiness"
throw illegalArgumentException;
}
}

T item = fromServer().get();
if (Readiness.isReady((HasMetadata) item)) {
return item;
}

throw new IllegalStateException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] not ready!");
return waitUntilCondition(resource -> Objects.nonNull(resource) && Readiness.isReady(resource), amount, timeUnit);
}

@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit)
throws InterruptedException {
return waitUntilConditionWithRetries(condition, timeUnit.toNanos(amount), watchRetryInitialBackoffMillis);
}

long timeoutInMillis = timeUnit.toNanos(amount);

long end = System.nanoTime() + timeoutInMillis;
while (System.nanoTime() < end) {
T item = get();
if (condition.test(item)) {
return item;
}

// in the future, this should probably be more intelligent
Thread.sleep(500);
}
private T waitUntilConditionWithRetries(Predicate<T> condition, long timeoutNanos, long backoffMillis)
throws InterruptedException {

T item = get();
T item = fromServer().get();
if (condition.test(item)) {
return item;
}

throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] not found!");
long end = System.nanoTime() + timeoutNanos;

WaitForConditionWatcher<T> watcher = new WaitForConditionWatcher<>(condition);
Watch watch = item == null
? watch(new ListOptionsBuilder()
.withResourceVersion(null)
.build(), watcher)
: watch(item.getMetadata().getResourceVersion(), watcher);

try {
return watcher.getFuture()
.get(timeoutNanos, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof WatchException && ((WatchException) e.getCause()).isShouldRetry()) {
watch.close();
LOG.debug("retryable watch exception encountered, retrying after {} millis", backoffMillis, e.getCause());
Thread.sleep(backoffMillis);
long newTimeout = end - System.nanoTime();
long newBackoff = (long) (backoffMillis * watchRetryBackoffMultiplier);
return waitUntilConditionWithRetries(condition, newTimeout, newBackoff);
}
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
LOG.debug("ran out of time waiting for watcher, wait condition not met");
throw new IllegalArgumentException(type.getSimpleName() + " with name:[" + name + "] in namespace:[" + namespace + "] matching condition not found!");
} finally {
watch.close();
}
}

public void setType(Class<T> type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@

package io.fabric8.kubernetes.client.dsl.base;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;

import io.fabric8.kubernetes.api.builder.Function;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Doneable;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ReplicationController;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.fabric8.kubernetes.client.internal.readiness.ReadinessWatcher;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.TimeUnit;

public class HasMetadataOperation<T extends HasMetadata, L extends KubernetesResourceList, D extends Doneable<T>, R extends Resource<T, D>>
extends BaseOperation< T, L, D, R> {
Expand Down Expand Up @@ -157,53 +153,4 @@ public T patch(T item) {
}
throw KubernetesClientException.launderThrowable(forOperationType("patch"), caught);
}

/**
* A wait method that combines watching and polling.
* The need for that is that in some cases a pure watcher approach consistently fails.
* @param i The number of iterations to perform.
* @param started Time in milliseconds where the watch started.
* @param interval The amount of time in millis to wait on each iteration.
* @param amount The maximum amount in millis of time since started to wait.
* @return The {@link ReplicationController} if ready.
*/
protected T periodicWatchUntilReady(int i, long started, long interval, long amount) {
T item = fromServer().get();
if (Readiness.isReady(item)) {
return item;
}

ReadinessWatcher<T> watcher = new ReadinessWatcher<>(item);
try (Watch watch = watch(item.getMetadata().getResourceVersion(), watcher)) {
try {
return watcher.await(interval, TimeUnit.NANOSECONDS);
} catch (KubernetesClientTimeoutException e) {
if (i <= 0) {
throw e;
}
}

long remaining = (started + amount) - System.nanoTime();
long next = Math.max(0, Math.min(remaining, interval));
return periodicWatchUntilReady(i - 1, started, next, amount);
}
}

@Override
public T waitUntilReady(long amount, TimeUnit timeUnit) throws InterruptedException {
if (Readiness.isReadinessApplicable(getType())) {
long started = System.nanoTime();
waitUntilExists(amount, timeUnit);
long alreadySpent = System.nanoTime() - started;

long remaining = timeUnit.toNanos(amount) - alreadySpent;
if (remaining <= 0) {
return periodicWatchUntilReady(0, System.nanoTime(), 0, 0);
}

return periodicWatchUntilReady(10, System.nanoTime(), Math.max(remaining / 10, 1000000000L), remaining);
}

return super.waitUntilReady(amount, timeUnit);
}
}
Loading