diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java index 99a9ffc9a33d1..c16508f8b9915 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.kubeclient; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.util.FileUtils; @@ -46,16 +47,8 @@ public static FlinkKubeClientFactory getInstance() { return INSTANCE; } - /** - * Create a Flink Kubernetes client with the given configuration. - * - * @param flinkConfig Flink configuration - * @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager, - * kubernetes-ha-services) - * @return Return the Flink Kubernetes client with the specified configuration and dedicated IO - * executor. - */ - public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) { + @VisibleForTesting + public NamespacedKubernetesClient createFabric8ioKubernetesClient(Configuration flinkConfig) { final Config config; final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT); @@ -95,11 +88,23 @@ public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCa config.setUserAgent(userAgent); LOG.debug("Setting Kubernetes client namespace: {}, userAgent: {}", namespace, userAgent); - final NamespacedKubernetesClient client = - new KubernetesClientBuilder() - .withConfig(config) - .build() - .adapt(NamespacedKubernetesClient.class); + return new KubernetesClientBuilder() + .withConfig(config) + .build() + .adapt(NamespacedKubernetesClient.class); + } + + /** + * Create a Flink Kubernetes client with the given configuration. + * + * @param flinkConfig Flink configuration + * @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager, + * kubernetes-ha-services) + * @return Return the Flink Kubernetes client with the specified configuration and dedicated IO + * executor. + */ + public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) { + final NamespacedKubernetesClient client = createFabric8ioKubernetesClient(flinkConfig); final int poolSize = flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE); return new Fabric8FlinkKubeClient( diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java index d072e15cf18d7..dd0accf4369d5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java @@ -20,7 +20,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; @@ -31,8 +33,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; + +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a @@ -55,17 +62,32 @@ public class KubernetesLeaderElector { private final Object lock = new Object(); - private final ExecutorService executorService = - Executors.newFixedThreadPool( - 3, new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")); + private final NamespacedKubernetesClient kubernetesClient; + private final LeaderElectionConfig leaderElectionConfig; + private final ExecutorService executorService; - private final LeaderElector internalLeaderElector; + private CompletableFuture currentLeaderElectionSession = FutureUtils.completedVoidFuture(); public KubernetesLeaderElector( NamespacedKubernetesClient kubernetesClient, KubernetesLeaderElectionConfiguration leaderConfig, LeaderCallbackHandler leaderCallbackHandler) { - final LeaderElectionConfig leaderElectionConfig = + this( + kubernetesClient, + leaderConfig, + leaderCallbackHandler, + Executors.newSingleThreadExecutor( + new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"))); + } + + @VisibleForTesting + public KubernetesLeaderElector( + NamespacedKubernetesClient kubernetesClient, + KubernetesLeaderElectionConfiguration leaderConfig, + LeaderCallbackHandler leaderCallbackHandler, + ExecutorService executorService) { + this.kubernetesClient = kubernetesClient; + this.leaderElectionConfig = new LeaderElectionConfigBuilder() .withName(leaderConfig.getConfigMapName()) .withLeaseDuration(leaderConfig.getLeaseDuration()) @@ -76,6 +98,7 @@ public KubernetesLeaderElector( leaderConfig.getLockIdentity())) .withRenewDeadline(leaderConfig.getRenewDeadline()) .withRetryPeriod(leaderConfig.getRetryPeriod()) + .withReleaseOnCancel(true) .withLeaderCallbacks( new LeaderCallbacks( leaderCallbackHandler::isLeader, @@ -86,12 +109,27 @@ public KubernetesLeaderElector( newLeader, leaderConfig.getConfigMapName()))) .build(); - internalLeaderElector = - new LeaderElector(kubernetesClient, leaderElectionConfig, executorService); + this.executorService = executorService; + LOG.info( - "Create KubernetesLeaderElector {} with lock identity {}.", - leaderConfig.getConfigMapName(), - leaderConfig.getLockIdentity()); + "Create KubernetesLeaderElector on lock {}.", + leaderElectionConfig.getLock().describe()); + } + + @GuardedBy("lock") + private void resetInternalLeaderElector() { + cancelCurrentLeaderElectionSession(); + + currentLeaderElectionSession = + new LeaderElector(kubernetesClient, leaderElectionConfig, executorService).start(); + + LOG.info( + "Triggered leader election on lock {}.", leaderElectionConfig.getLock().describe()); + } + + @GuardedBy("lock") + private void cancelCurrentLeaderElectionSession() { + currentLeaderElectionSession.cancel(true); } public void run() { @@ -100,14 +138,29 @@ public void run() { LOG.debug( "Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down."); } else { - executorService.execute(internalLeaderElector::run); + resetInternalLeaderElector(); } } } public void stop() { synchronized (lock) { - executorService.shutdownNow(); + // cancelling the current session needs to happen explicitly to allow the execution of + // code that handles the leader loss + cancelCurrentLeaderElectionSession(); + + // the shutdown of the executor needs to happen gracefully for scenarios where the + // release is called in the executorService. Interrupting this logic will result in the + // leadership-lost event not being sent to the client. + final List outStandingTasks = + ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, executorService); + + if (!outStandingTasks.isEmpty()) { + LOG.warn( + "{} events were not processed before stopping the {} instance.", + outStandingTasks.size(), + KubernetesLeaderElector.class.getSimpleName()); + } } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java index a86008f693ac6..e167d05876525 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java @@ -19,18 +19,23 @@ package org.apache.flink.kubernetes.kubeclient.resources; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.kubernetes.KubernetesExtension; +import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions; import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; import java.util.UUID; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; /** @@ -124,4 +129,107 @@ void testMultipleKubernetesLeaderElectors() throws Exception { } } } + + /** + * This test verifies that the {@link KubernetesLeaderElector} is able to handle scenario where + * the lease cannot be renewed. + * + *

See FLINK-34007 for further details. + */ + @Test + void testLeaderElectorLifecycleManagement() throws Exception { + final Configuration configuration = kubernetesExtension.getConfiguration(); + + try (final NamespacedKubernetesClient client = + kubeClientFactory.createFabric8ioKubernetesClient(configuration)) { + + // set a low timeout that makes the client stop renewing the leadership lease + final Duration renewTimeout = Duration.ofMillis(100); + configuration.set( + KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, renewTimeout); + + final String lockIdentity = UUID.randomUUID().toString(); + final KubernetesLeaderElectionConfiguration leaderConfig = + new KubernetesLeaderElectionConfiguration( + configMapName, lockIdentity, configuration); + final TestingLeaderCallbackHandler leadershipCallbackHandler = + new TestingLeaderCallbackHandler(lockIdentity); + + final ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); + final KubernetesLeaderElector testInstance = + new KubernetesLeaderElector( + client, leaderConfig, leadershipCallbackHandler, executorService); + + // first leadership lifecycle initiation + testInstance.run(); + + // triggers acquiring the leadership + final Duration waitForNextTaskForever = Duration.ofDays(10); + executorService.trigger(waitForNextTaskForever); + + assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync()) + .as("The leadership should be acquired eventually.") + .eventuallySucceeds(); + + // halt thread to reach the renew deadline + Thread.sleep(renewTimeout.plusSeconds(1).toMillis()); + + // triggers renew loop within fabric8io's LeaderElector + executorService.trigger(); + + assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync()) + .as( + "The leadership should be lost eventually due to the renewal loop being stopped.") + .eventuallySucceeds(); + + // revoking the leadership initiates another leadership lifecycle + testInstance.run(); + executorService.trigger(waitForNextTaskForever); + + assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync()) + .as("The leadership should be acquired eventually again."); + } + } + + @Test + void testKubernetesLeaderElectorSendingLeadershipLostSignalOnStop() { + final Configuration configuration = kubernetesExtension.getConfiguration(); + + try (final NamespacedKubernetesClient client = + kubeClientFactory.createFabric8ioKubernetesClient(configuration)) { + + final String lockIdentity = UUID.randomUUID().toString(); + final KubernetesLeaderElectionConfiguration leaderConfig = + new KubernetesLeaderElectionConfiguration( + configMapName, lockIdentity, configuration); + final TestingLeaderCallbackHandler leadershipCallbackHandler = + new TestingLeaderCallbackHandler(lockIdentity); + + final ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); + final KubernetesLeaderElector testInstance = + new KubernetesLeaderElector( + client, leaderConfig, leadershipCallbackHandler, executorService); + + // initiate leadership lifecycle + testInstance.run(); + + final Duration waitForNextTaskForever = Duration.ofDays(10); + executorService.trigger(waitForNextTaskForever); + assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync()) + .as("Leadership should be acquired eventually.") + .eventuallySucceeds(); + + testInstance.stop(); + + assertThat(leadershipCallbackHandler.hasLeadership()) + .as("Leadership should be lost right away after stopping the test instance.") + .isFalse(); + + assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync()) + .as("There should be a leadership lost event being received eventually.") + .eventuallySucceeds(); + } + } }