Skip to content

Commit

Permalink
Support the ${jobName} variable in Flink configuration. (#4062)
Browse files Browse the repository at this point in the history
* feature: support jobName var in flink conf

* fix issue

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
lintingbin and wolfboys authored Sep 15, 2024
1 parent 076578c commit aaf7c74
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.common.utils;

public class CommonUtils {
private CommonUtils() {}

public static String fixedValueBaseVar(String configValue, String jobName) {
return configValue.replace("${jobName}", jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.utils.CommonUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;

Expand Down Expand Up @@ -115,11 +116,14 @@ public Map<String, String> convertFlinkYamlAsMap() {
}

@JsonIgnore
public Properties getFlinkConfig() {
public Properties getFlinkConfig(Application application) {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
flinkConfig.putAll(config);
for (Map.Entry<String, String> entry : config.entrySet()) {
String value = CommonUtils.fixedValueBaseVar(entry.getValue(), application.getJobName());
flinkConfig.setProperty(entry.getKey(), value);
}
return flinkConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ private Map<String, Object> getProperties(Application application, FlinkEnv flin

if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String archiveDir =
flinkEnv.getFlinkConfig().getProperty(JobManagerOptions.ARCHIVE_DIR.key());
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
if (archiveDir != null) {
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ private void expire(Savepoint entity) {
}

if (cpThreshold == 0) {
String flinkConfNumRetained = flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
String flinkConfNumRetained =
flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
int numRetainedDefaultValue = 1;
if (flinkConfNumRetained != null) {
try {
Expand Down Expand Up @@ -292,7 +293,7 @@ public String getSavePointPath(Application appParam) throws Exception {
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
Properties flinkConfig = flinkEnv.getFlinkConfig();
Properties flinkConfig = flinkEnv.getFlinkConfig(application);
savepointPath =
flinkConfig.getProperty(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private List<TrackId> getK8sWatchingApps() {

public TrackId toTrackId(Application app) {
FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
Properties properties = flinkEnv.getFlinkConfig();
Properties properties = flinkEnv.getFlinkConfig(app);

Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.model.ClusterKey

import io.fabric8.kubernetes.api.model.{Config => _}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
import org.apache.flink.configuration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,12 @@ trait FlinkClientTrait extends Logger {
commandLine)

val configuration =
applyConfiguration(submitRequest.flinkVersion.flinkHome, activeCommandLine, commandLine)
applyConfiguration(
submitRequest.flinkVersion.flinkHome,
activeCommandLine,
commandLine,
submitRequest.id.toString,
submitRequest.effectiveAppName)

commandLine -> configuration

Expand Down Expand Up @@ -443,17 +448,24 @@ trait FlinkClientTrait extends Logger {
private[this] def applyConfiguration(
flinkHome: String,
activeCustomCommandLine: CustomCommandLine,
commandLine: CommandLine): Configuration = {
commandLine: CommandLine,
jobId: String = null,
jobName: String = null): Configuration = {

require(activeCustomCommandLine != null, "activeCustomCommandLine must not be null.")
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
key => {
val value = flinkDefaultConfiguration.getString(key, null)
if (value != null) {
configuration.setString(key, value)
var result = value
if (value != null && StringUtils.isNotBlank(jobName)) {
result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName)
}
if (jobId != null) {
result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
}
configuration.setString(key, result)
})
configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
configuration
Expand Down

0 comments on commit aaf7c74

Please sign in to comment.