Skip to content

Commit

Permalink
[FLINK-34007][k8s] fabric8io LeaderElector is created with every new …
Browse files Browse the repository at this point in the history
…#run() call

v5.12.4 allowed us to reuse the LeaderElector. With v6.6.2 (fabric8io/kubernetes-client#4125) this behavior changed. One LeaderElector can only be used until the leadership is lost.
An ITCase is added to cover the scenario where the leadership is lost.
  • Loading branch information
XComp committed Feb 1, 2024
1 parent 8ff011f commit 927972f
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.kubernetes.kubeclient;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -46,16 +47,8 @@ public static FlinkKubeClientFactory getInstance() {
return INSTANCE;
}

/**
* Create a Flink Kubernetes client with the given configuration.
*
* @param flinkConfig Flink configuration
* @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager,
* kubernetes-ha-services)
* @return Return the Flink Kubernetes client with the specified configuration and dedicated IO
* executor.
*/
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
@VisibleForTesting
public NamespacedKubernetesClient createFabric8ioKubernetesClient(Configuration flinkConfig) {
final Config config;

final String kubeContext = flinkConfig.get(KubernetesConfigOptions.CONTEXT);
Expand Down Expand Up @@ -94,11 +87,23 @@ public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCa
config.setUserAgent(userAgent);
LOG.debug("Setting Kubernetes client namespace: {}, userAgent: {}", namespace, userAgent);

final NamespacedKubernetesClient client =
new KubernetesClientBuilder()
.withConfig(config)
.build()
.adapt(NamespacedKubernetesClient.class);
return new KubernetesClientBuilder()
.withConfig(config)
.build()
.adapt(NamespacedKubernetesClient.class);
}

/**
* Create a Flink Kubernetes client with the given configuration.
*
* @param flinkConfig Flink configuration
* @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager,
* kubernetes-ha-services)
* @return Return the Flink Kubernetes client with the specified configuration and dedicated IO
* executor.
*/
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
final NamespacedKubernetesClient client = createFabric8ioKubernetesClient(flinkConfig);
final int poolSize =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
return new Fabric8FlinkKubeClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
Expand All @@ -33,6 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -59,17 +65,32 @@ public class KubernetesLeaderElector {

private final Object lock = new Object();

private final ExecutorService executorService =
Executors.newFixedThreadPool(
3, new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
private final NamespacedKubernetesClient kubernetesClient;
private final LeaderElectionConfig leaderElectionConfig;
private final ExecutorService executorService;

private final LeaderElector internalLeaderElector;
private CompletableFuture<?> currentLeaderElectionSession = FutureUtils.completedVoidFuture();

public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
final LeaderElectionConfig leaderElectionConfig =
this(
kubernetesClient,
leaderConfig,
leaderCallbackHandler,
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));
}

@VisibleForTesting
public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler,
ExecutorService executorService) {
this.kubernetesClient = kubernetesClient;
this.leaderElectionConfig =
new LeaderElectionConfigBuilder()
.withName(leaderConfig.getConfigMapName())
.withLeaseDuration(leaderConfig.getLeaseDuration())
Expand All @@ -88,6 +109,7 @@ public KubernetesLeaderElector(
leaderConfig.getLockIdentity()))
.withRenewDeadline(leaderConfig.getRenewDeadline())
.withRetryPeriod(leaderConfig.getRetryPeriod())
.withReleaseOnCancel(true)
.withLeaderCallbacks(
new LeaderCallbacks(
leaderCallbackHandler::isLeader,
Expand All @@ -98,12 +120,27 @@ public KubernetesLeaderElector(
newLeader,
leaderConfig.getConfigMapName())))
.build();
internalLeaderElector =
new LeaderElector(kubernetesClient, leaderElectionConfig, executorService);
this.executorService = executorService;

LOG.info(
"Create KubernetesLeaderElector {} with lock identity {}.",
leaderConfig.getConfigMapName(),
leaderConfig.getLockIdentity());
"Create KubernetesLeaderElector on lock {}.",
leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void resetInternalLeaderElector() {
cancelCurrentLeaderElectionSession();

currentLeaderElectionSession =
new LeaderElector(kubernetesClient, leaderElectionConfig, executorService).start();

LOG.info(
"Triggered leader election on lock {}.", leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void cancelCurrentLeaderElectionSession() {
currentLeaderElectionSession.cancel(true);
}

public void run() {
Expand All @@ -112,14 +149,21 @@ public void run() {
LOG.debug(
"Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down.");
} else {
executorService.execute(internalLeaderElector::run);
resetInternalLeaderElector();
}
}
}

public void stop() {
synchronized (lock) {
executorService.shutdownNow();
// cancelling the current session needs to happen explicitly to allow the execution of
// code that handles the leader loss
cancelCurrentLeaderElectionSession();

final List<Runnable> outstandingTasks = executorService.shutdownNow();
Preconditions.checkState(
outstandingTasks.isEmpty(),
"All tasks that handle the leadership revocation should have been executed.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.kubernetes.KubernetesExtension;
import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;

import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.UUID;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_KEY;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -159,4 +164,107 @@ void testClusterConfigMapLabelsAreSet() throws Exception {
}
}
}

/**
* This test verifies that the {@link KubernetesLeaderElector} is able to handle scenario where
* the lease cannot be renewed.
*
* <p>See FLINK-34007 for further details.
*/
@Test
void testLeaderElectorLifecycleManagement() throws Exception {
final Configuration configuration = kubernetesExtension.getConfiguration();

try (final NamespacedKubernetesClient client =
kubeClientFactory.createFabric8ioKubernetesClient(configuration)) {

// set a low timeout that makes the client stop renewing the leadership lease
final Duration renewTimeout = Duration.ofMillis(100);
configuration.set(
KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, renewTimeout);

final String lockIdentity = UUID.randomUUID().toString();
final KubernetesLeaderElectionConfiguration leaderConfig =
new KubernetesLeaderElectionConfiguration(
configMapName, lockIdentity, configuration);
final TestingLeaderCallbackHandler leadershipCallbackHandler =
new TestingLeaderCallbackHandler(lockIdentity);

final ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
final KubernetesLeaderElector testInstance =
new KubernetesLeaderElector(
client, leaderConfig, leadershipCallbackHandler, executorService);

// first leadership lifecycle initiation
testInstance.run();

// triggers acquiring the leadership
final Duration waitForNextTaskForever = Duration.ofDays(10);
executorService.trigger(waitForNextTaskForever);

assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
.as("The leadership should be acquired eventually.")
.eventuallySucceeds();

// halt thread to reach the renew deadline
Thread.sleep(renewTimeout.plusSeconds(1).toMillis());

// triggers renew loop within fabric8io's LeaderElector
executorService.trigger();

assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync())
.as(
"The leadership should be lost eventually due to the renewal loop being stopped.")
.eventuallySucceeds();

// revoking the leadership initiates another leadership lifecycle
testInstance.run();
executorService.trigger(waitForNextTaskForever);

assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
.as("The leadership should be acquired eventually again.");
}
}

@Test
void testKubernetesLeaderElectorSendingLeadershipLostSignalOnStop() {
final Configuration configuration = kubernetesExtension.getConfiguration();

try (final NamespacedKubernetesClient client =
kubeClientFactory.createFabric8ioKubernetesClient(configuration)) {

final String lockIdentity = UUID.randomUUID().toString();
final KubernetesLeaderElectionConfiguration leaderConfig =
new KubernetesLeaderElectionConfiguration(
configMapName, lockIdentity, configuration);
final TestingLeaderCallbackHandler leadershipCallbackHandler =
new TestingLeaderCallbackHandler(lockIdentity);

final ManuallyTriggeredScheduledExecutorService executorService =
new ManuallyTriggeredScheduledExecutorService();
final KubernetesLeaderElector testInstance =
new KubernetesLeaderElector(
client, leaderConfig, leadershipCallbackHandler, executorService);

// initiate leadership lifecycle
testInstance.run();

final Duration waitForNextTaskForever = Duration.ofDays(10);
executorService.trigger(waitForNextTaskForever);
assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
.as("Leadership should be acquired eventually.")
.eventuallySucceeds();

testInstance.stop();

assertThat(leadershipCallbackHandler.hasLeadership())
.as("Leadership should be lost right away after stopping the test instance.")
.isFalse();

assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync())
.as("There should be a leadership lost event being received eventually.")
.eventuallySucceeds();
}
}
}

0 comments on commit 927972f

Please sign in to comment.