Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #810 from spotify/rewatch
Browse files Browse the repository at this point in the history
Rewatch
  • Loading branch information
honnix authored Nov 21, 2019
2 parents b53d5a1 + 0f58002 commit 24b9c83
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* A {@link DockerRunner} implementation that submits container executions to a Kubernetes cluster.
Expand Down Expand Up @@ -156,6 +157,7 @@ class KubernetesDockerRunner implements DockerRunner {
private final Duration podDeletionDelay;
private final Time time;
private final ExecutorService executor;
private Watch watch;

@VisibleForTesting
KubernetesDockerRunner(String id, Fabric8KubernetesClient client, StateManager stateManager, Stats stats,
Expand Down Expand Up @@ -527,23 +529,21 @@ boolean shouldDeletePod(WorkflowInstance workflowInstance, Pod pod, String reaso

@Override
public void close() throws IOException {
closeWatch();
closer.close();
}

void init() {
scheduleWithJitter(this::cleanupPods, scheduledExecutor, cleanupPodsInterval);

final PodWatcher watcher = new PodWatcher();
final Watch watch;
try {
watch = client.watchPods(watcher);
} catch (Throwable t) {
LOG.warn("Failed to watch pods and will rely on polling.", t);
return;
}

closer.register(watch);

scheduleWithJitter(watcher::processPodUpdates, scheduledExecutor, PROCESS_POD_UPDATE_INTERVAL);
}

Expand Down Expand Up @@ -688,6 +688,8 @@ private String readStatus(Pod pod) {

public class PodWatcher implements Watcher<Pod> {

private static final int RECONNECT_DELAY_SECONDS = 1;

private final ConcurrentMap<String, WorkflowInstance> podUpdates = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -753,9 +755,33 @@ private void processPodUpdate(String podName, WorkflowInstance instance) {
emitPodEvents(pod, runState.get());
}

private void reconnect() {
LOG.info("Re-establishing pod watcher");

closeWatch();

try {
watch = client.watchPods(this);
} catch (Throwable e) {
LOG.warn("Retry threw", e);
scheduleReconnect();
}
}

private void scheduleReconnect() {
scheduledExecutor.schedule(this::reconnect, RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onClose(KubernetesClientException e) {
LOG.warn("Watch closed", e);
scheduleReconnect();
}
}

private void closeWatch() {
if (watch != null) {
watch.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,32 @@ public void setUp() {
@After
public void tearDown() throws Exception {
kdr.close();
verify(podWatch, atLeastOnce()).close();
}

@Test
public void shouldFailToInitialize() {
public void shouldReconnectUponWatcherClose() {
Mockito.reset(k8sClient);
when(k8sClient.watchPods(any())).thenThrow(new KubernetesClientException("Forced failure")).thenReturn(podWatch);

podWatcher.onClose(new KubernetesClientException("Forced failure"));

executor.tick(2, TimeUnit.SECONDS);
verify(podWatch, times(2)).close();
verify(k8sClient, times(2)).watchPods(any());
}

@Test
public void shouldFailToInitialize() throws IOException {
var spiedExecutor = spy(executor);
when(k8sClient.watchPods(any())).thenThrow(new KubernetesClientException("Forced failure"));
kdr = new KubernetesDockerRunner(RUNNER_ID, k8sClient, stateManager, stats, serviceAccountSecretManager,
var kdr = new KubernetesDockerRunner(RUNNER_ID, k8sClient, stateManager, stats, serviceAccountSecretManager,
debug, STYX_ENVIRONMENT, SECRET_WHITELIST, POD_CLEANUP_INTERVAL_SECONDS, POD_DELETION_DELAY_SECONDS, time,
spiedExecutor);
kdr.init();
verify(spiedExecutor).schedule(any(Runnable.class), anyLong(), any());
kdr.close();
verify(podWatch, never()).close();
}

@Test
Expand Down

0 comments on commit 24b9c83

Please sign in to comment.