Skip to content

Commit

Permalink
Refactor waitUntilCondition() for lists to use common fork-join pool.
Browse files Browse the repository at this point in the history
This should be less resource-intensive than using a new thread for every list item, regardless of the size of the list.
Also revert previous change for preserving thread interrupt status, because the thread is exiting anyway.
  • Loading branch information
chrisr3 authored and rohanKanojia committed Sep 29, 2020
1 parent f3f4a3a commit 65d3282
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public FilterWatchListDeletable<T, L, Boolean, Watch> withField(String key, Stri
}

@Override
public FilterWatchListDeletable<T, L, Boolean, Watch, Watcher<T>> withInvolvedObject(ObjectReference objectReference) {
public FilterWatchListDeletable<T, L, Boolean, Watch> withInvolvedObject(ObjectReference objectReference) {
if (objectReference != null) {
if (objectReference.getName() != null) {
fields.put(INVOLVED_OBJECT_NAME, objectReference.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,29 @@
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;

import java.net.HttpURLConnection;
import java.util.function.Predicate;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl extends OperationSupport implements ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean>,
Waitable<List<HasMetadata>, HasMetadata>, Readiable {
Expand Down Expand Up @@ -139,34 +140,27 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
}

final List<CompletableFuture<HasMetadata>> futures = new ArrayList<>(items.size());
final ExecutorService executor = Executors.newFixedThreadPool(items.size());
for (final HasMetadata meta : items) {
final ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
futures.add(CompletableFuture.supplyAsync(() -> {
try {
return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit);
} catch (Exception e) {
//consider all errors as not ready.
logAsNotReady(e, meta);
return null;
}
}));
}

try {
for (final HasMetadata meta : items) {
final ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
futures.add(CompletableFuture.supplyAsync(() -> {
try {
return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit);
} catch (InterruptedException e) {
// Don't forget that this thread was interrupted.
Thread.currentThread().interrupt();

//consider all errors as not ready.
logAsNotReady(e, meta);
return null;
} catch (Exception e) {
//consider all errors as not ready.
logAsNotReady(e, meta);
return null;
}
}, executor));
}

// Wait for all futures to complete, remembering that every future
// has been given the same timeout value.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
} finally {
executor.shutdown();
// All of the individual futures have the same timeout period,
// but they may not all necessarily start executing together.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(amount, timeUnit);
} catch (TimeoutException | ExecutionException e) {
// We don't allow individual futures to complete with an exception,
// which means we should never catch ExecutionException here.
LOGGER.debug("Global timeout reached", e);
}

final List<HasMetadata> results = new ArrayList<>();
Expand All @@ -177,13 +171,19 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
int i = 0;
for (final HasMetadata meta : items) {
try {
HasMetadata result = futures.get(i).get();
CompletableFuture<HasMetadata> future = futures.get(i);
HasMetadata result = future.getNow(null);
if (result != null) {
results.add(result);
} else {
// Cancel this future, just in case it never had
// an opportunity to execute in the first place.
future.cancel(true);
itemsWithConditionNotMatched.add(meta);
}
} catch (ExecutionException e) {
} catch (CompletionException e) {
// We should never reach here, because individual futures
// aren't allowed to complete with an exception.
itemsWithConditionNotMatched.add(meta);
logAsNotReady(e.getCause(), meta);
}
Expand Down Expand Up @@ -455,26 +455,4 @@ private static <T> ResourceHandler handlerOf(T item) {
throw new IllegalArgumentException("Could not find a registered handler for item: [" + item + "].");
}
}

/**
* Waits until the latch reaches to zero and then checks if the expected result
* @param latch The latch.
* @param expected The expected number.
* @param actual The actual number.
* @param amount The amount of time to wait.
* @param timeUnit The timeUnit.
* @return
*/
private static boolean checkConditionMetForAll(CountDownLatch latch, int expected, AtomicInteger actual, long amount, TimeUnit timeUnit) {
try {
if (latch.await(amount, timeUnit)) {
return actual.intValue() == expected;
}
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected Deployment createClone(Deployment obj, String newName, String newDeplo

@Override
protected PodList listSelectedPods(Deployment obj) {
FilterWatchListDeletable<Pod, PodList, Boolean, Watch, Watcher<Pod>> podLister = pods().inNamespace(namespace);
FilterWatchListDeletable<Pod, PodList, Boolean, Watch> podLister = pods().inNamespace(namespace);
if (obj.getSpec().getSelector().getMatchLabels() != null) {
podLister.withLabels(obj.getSpec().getSelector().getMatchLabels());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected ReplicaSet createClone(ReplicaSet obj, String newName, String newDeplo

@Override
protected PodList listSelectedPods(ReplicaSet obj) {
FilterWatchListDeletable<Pod, PodList, Boolean, Watch, Watcher<Pod>> podLister = pods().inNamespace(namespace);
FilterWatchListDeletable<Pod, PodList, Boolean, Watch> podLister = pods().inNamespace(namespace);
if (obj.getSpec().getSelector().getMatchLabels() != null) {
podLister.withLabels(obj.getSpec().getSelector().getMatchLabels());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private FilterWatchListDeletable<Pod, PodList, Boolean, Watch> getMockPodFilterO
public FilterWatchListDeletable<Pod, PodList, Boolean, Watch> withLabelSelector(LabelSelector selector) { return null; }

@Override
public FilterWatchListDeletable<Pod, PodList, Boolean, Watch, Watcher<Pod>> withInvolvedObject(ObjectReference objectReference) { return null; }
public FilterWatchListDeletable<Pod, PodList, Boolean, Watch> withInvolvedObject(ObjectReference objectReference) { return null; }

@Override
public PodList list() { return getMockPodList(controllerUid); }
Expand Down

0 comments on commit 65d3282

Please sign in to comment.