Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34333][k8s] 1.18 backport of FLINK-34007 #24245

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,8 +40,9 @@ public class ExecutorUtils {
* @param timeout to wait for the termination of all ExecutorServices
* @param unit of the timeout
* @param executorServices to shut down
* @return Tasks that were not executed prior to a {@link ExecutorService#shutdownNow()}.
*/
public static void gracefulShutdown(
public static List<Runnable> gracefulShutdown(
long timeout, TimeUnit unit, ExecutorService... executorServices) {
for (ExecutorService executorService : executorServices) {
executorService.shutdown();
Expand All @@ -50,22 +53,23 @@ public static void gracefulShutdown(
long timeLeft = unit.toMillis(timeout);
boolean hasTimeLeft = timeLeft > 0L;

final List<Runnable> outstandingTasks = new ArrayList<>();
for (ExecutorService executorService : executorServices) {
if (wasInterrupted || !hasTimeLeft) {
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());
} else {
try {
if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
LOG.warn(
"ExecutorService did not terminate in time. Shutting it down now.");
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());
}
} catch (InterruptedException e) {
LOG.warn(
"Interrupted while shutting down executor services. Shutting all "
+ "remaining ExecutorServices down now.",
e);
executorService.shutdownNow();
outstandingTasks.addAll(executorService.shutdownNow());

wasInterrupted = true;

Expand All @@ -76,6 +80,8 @@ public static void gracefulShutdown(
hasTimeLeft = timeLeft > 0L;
}
}

return outstandingTasks;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion flink-kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ under the License.
<packaging>jar</packaging>

<properties>
<kubernetes.client.version>6.6.2</kubernetes.client.version>
<kubernetes.client.version>6.9.2</kubernetes.client.version>
<surefire.module.config><!--
CommonTestUtils#setEnv
-->--add-opens=java.base/java.util=ALL-UNNAMED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.kubernetes.kubeclient;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.util.FileUtils;
Expand Down Expand Up @@ -46,16 +47,8 @@ public static FlinkKubeClientFactory getInstance() {
return INSTANCE;
}

/**
* Create a Flink Kubernetes client with the given configuration.
*
* @param flinkConfig Flink configuration
* @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager,
* kubernetes-ha-services)
* @return Return the Flink Kubernetes client with the specified configuration and dedicated IO
* executor.
*/
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
@VisibleForTesting
public NamespacedKubernetesClient createFabric8ioKubernetesClient(Configuration flinkConfig) {
final Config config;

final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
Expand Down Expand Up @@ -95,11 +88,23 @@ public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCa
config.setUserAgent(userAgent);
LOG.debug("Setting Kubernetes client namespace: {}, userAgent: {}", namespace, userAgent);

final NamespacedKubernetesClient client =
new KubernetesClientBuilder()
.withConfig(config)
.build()
.adapt(NamespacedKubernetesClient.class);
return new KubernetesClientBuilder()
.withConfig(config)
.build()
.adapt(NamespacedKubernetesClient.class);
}

/**
* Create a Flink Kubernetes client with the given configuration.
*
* @param flinkConfig Flink configuration
* @param useCase Flink Kubernetes client use case (e.g. client, resourcemanager,
* kubernetes-ha-services)
* @return Return the Flink Kubernetes client with the specified configuration and dedicated IO
* executor.
*/
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
final NamespacedKubernetesClient client = createFabric8ioKubernetesClient(flinkConfig);
final int poolSize =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
return new Fabric8FlinkKubeClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;

import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
Expand All @@ -31,8 +33,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a
Expand All @@ -55,17 +62,32 @@ public class KubernetesLeaderElector {

private final Object lock = new Object();

private final ExecutorService executorService =
Executors.newFixedThreadPool(
3, new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
private final NamespacedKubernetesClient kubernetesClient;
private final LeaderElectionConfig leaderElectionConfig;
private final ExecutorService executorService;

private final LeaderElector internalLeaderElector;
private CompletableFuture<?> currentLeaderElectionSession = FutureUtils.completedVoidFuture();

public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
final LeaderElectionConfig leaderElectionConfig =
this(
kubernetesClient,
leaderConfig,
leaderCallbackHandler,
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));
}

@VisibleForTesting
public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler,
ExecutorService executorService) {
this.kubernetesClient = kubernetesClient;
this.leaderElectionConfig =
new LeaderElectionConfigBuilder()
.withName(leaderConfig.getConfigMapName())
.withLeaseDuration(leaderConfig.getLeaseDuration())
Expand All @@ -76,6 +98,7 @@ public KubernetesLeaderElector(
leaderConfig.getLockIdentity()))
.withRenewDeadline(leaderConfig.getRenewDeadline())
.withRetryPeriod(leaderConfig.getRetryPeriod())
.withReleaseOnCancel(true)
.withLeaderCallbacks(
new LeaderCallbacks(
leaderCallbackHandler::isLeader,
Expand All @@ -86,12 +109,27 @@ public KubernetesLeaderElector(
newLeader,
leaderConfig.getConfigMapName())))
.build();
internalLeaderElector =
new LeaderElector(kubernetesClient, leaderElectionConfig, executorService);
this.executorService = executorService;

LOG.info(
"Create KubernetesLeaderElector {} with lock identity {}.",
leaderConfig.getConfigMapName(),
leaderConfig.getLockIdentity());
"Create KubernetesLeaderElector on lock {}.",
leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void resetInternalLeaderElector() {
cancelCurrentLeaderElectionSession();

currentLeaderElectionSession =
new LeaderElector(kubernetesClient, leaderElectionConfig, executorService).start();

LOG.info(
"Triggered leader election on lock {}.", leaderElectionConfig.getLock().describe());
}

@GuardedBy("lock")
private void cancelCurrentLeaderElectionSession() {
currentLeaderElectionSession.cancel(true);
}

public void run() {
Expand All @@ -100,14 +138,29 @@ public void run() {
LOG.debug(
"Ignoring KubernetesLeaderElector.run call because the leader elector has already been shut down.");
} else {
executorService.execute(internalLeaderElector::run);
resetInternalLeaderElector();
}
}
}

public void stop() {
synchronized (lock) {
executorService.shutdownNow();
// cancelling the current session needs to happen explicitly to allow the execution of
// code that handles the leader loss
cancelCurrentLeaderElectionSession();

// the shutdown of the executor needs to happen gracefully for scenarios where the
// release is called in the executorService. Interrupting this logic will result in the
// leadership-lost event not being sent to the client.
final List<Runnable> outStandingTasks =
ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, executorService);

if (!outStandingTasks.isEmpty()) {
LOG.warn(
"{} events were not processed before stopping the {} instance.",
outStandingTasks.size(),
KubernetesLeaderElector.class.getSimpleName());
}
}
}

Expand Down
52 changes: 26 additions & 26 deletions flink-kubernetes/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@ This project bundles the following dependencies under the Apache Software Licens
- com.squareup.okhttp3:logging-interceptor:3.14.9
- com.squareup.okhttp3:okhttp:3.14.9
- com.squareup.okio:okio:1.17.2
- io.fabric8:kubernetes-client:6.6.2
- io.fabric8:kubernetes-client-api:6.6.2
- io.fabric8:kubernetes-httpclient-okhttp:6.6.2
- io.fabric8:kubernetes-model-admissionregistration:6.6.2
- io.fabric8:kubernetes-model-apiextensions:6.6.2
- io.fabric8:kubernetes-model-apps:6.6.2
- io.fabric8:kubernetes-model-autoscaling:6.6.2
- io.fabric8:kubernetes-model-batch:6.6.2
- io.fabric8:kubernetes-model-certificates:6.6.2
- io.fabric8:kubernetes-model-common:6.6.2
- io.fabric8:kubernetes-model-coordination:6.6.2
- io.fabric8:kubernetes-model-core:6.6.2
- io.fabric8:kubernetes-model-discovery:6.6.2
- io.fabric8:kubernetes-model-events:6.6.2
- io.fabric8:kubernetes-model-extensions:6.6.2
- io.fabric8:kubernetes-model-flowcontrol:6.6.2
- io.fabric8:kubernetes-model-gatewayapi:6.6.2
- io.fabric8:kubernetes-model-metrics:6.6.2
- io.fabric8:kubernetes-model-networking:6.6.2
- io.fabric8:kubernetes-model-node:6.6.2
- io.fabric8:kubernetes-model-policy:6.6.2
- io.fabric8:kubernetes-model-rbac:6.6.2
- io.fabric8:kubernetes-model-resource:6.6.2
- io.fabric8:kubernetes-model-scheduling:6.6.2
- io.fabric8:kubernetes-model-storageclass:6.6.2
- io.fabric8:kubernetes-client:6.9.2
- io.fabric8:kubernetes-client-api:6.9.2
- io.fabric8:kubernetes-httpclient-okhttp:6.9.2
- io.fabric8:kubernetes-model-admissionregistration:6.9.2
- io.fabric8:kubernetes-model-apiextensions:6.9.2
- io.fabric8:kubernetes-model-apps:6.9.2
- io.fabric8:kubernetes-model-autoscaling:6.9.2
- io.fabric8:kubernetes-model-batch:6.9.2
- io.fabric8:kubernetes-model-certificates:6.9.2
- io.fabric8:kubernetes-model-common:6.9.2
- io.fabric8:kubernetes-model-coordination:6.9.2
- io.fabric8:kubernetes-model-core:6.9.2
- io.fabric8:kubernetes-model-discovery:6.9.2
- io.fabric8:kubernetes-model-events:6.9.2
- io.fabric8:kubernetes-model-extensions:6.9.2
- io.fabric8:kubernetes-model-flowcontrol:6.9.2
- io.fabric8:kubernetes-model-gatewayapi:6.9.2
- io.fabric8:kubernetes-model-metrics:6.9.2
- io.fabric8:kubernetes-model-networking:6.9.2
- io.fabric8:kubernetes-model-node:6.9.2
- io.fabric8:kubernetes-model-policy:6.9.2
- io.fabric8:kubernetes-model-rbac:6.9.2
- io.fabric8:kubernetes-model-resource:6.9.2
- io.fabric8:kubernetes-model-scheduling:6.9.2
- io.fabric8:kubernetes-model-storageclass:6.9.2
- io.fabric8:zjsonpatch:0.3.0
- org.snakeyaml:snakeyaml-engine:2.6
- org.snakeyaml:snakeyaml-engine:2.7
- org.yaml:snakeyaml:1.33
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected void mockPodEventWithLabels(
// mock four kinds of events.
String mockPath =
String.format(
"/api/v1/namespaces/%s/pods?labelSelector=%s&resourceVersion=%s&allowWatchBookmarks=true&watch=true",
"/api/v1/namespaces/%s/pods?allowWatchBookmarks=true&labelSelector=%s&resourceVersion=%s&watch=true",
namespace,
labels.entrySet().stream()
.map(entry -> entry.getKey() + "%3D" + entry.getValue())
Expand Down
Loading