-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support the ${jobName} variable in Flink configuration. #4062
Support the ${jobName} variable in Flink configuration. #4062
Conversation
submitRequest.flinkVersion.flinkHome, | ||
activeCommandLine, | ||
commandLine, | ||
submitRequest.appName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be replaced with submit request.effectiveAppName
@@ -452,7 +459,7 @@ trait FlinkClientTrait extends Logger { | |||
key => { | |||
val value = flinkDefaultConfiguration.getString(key, null) | |||
if (value != null) { | |||
configuration.setString(key, value) | |||
configuration.setString(key, CommonUtils.fixedValueBaseVar(value, jobName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val v = value
.replaceAll("\\$job(Name|name)", jobName)
.replaceAll("\\$\\{job(Name|name)}", jobName)
configuration.setString(key, v)
private CommonUtils() {} | ||
|
||
public static String fixedValueBaseVar(String configValue, String jobName) { | ||
return configValue.replace("${jobName}", jobName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not recommended to add a method for this purpose and put it in the public module.
Quality Gate passedIssues Measures |
@@ -115,11 +116,14 @@ public Map<String, String> convertFlinkYamlAsMap() { | |||
} | |||
|
|||
@JsonIgnore | |||
public Properties getFlinkConfig() { | |||
public Properties getFlinkConfig(Application application) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JsonIgnore
public Properties getFlinkConfig(Application application) {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
for (Map.Entry<String, String> entry : config.entrySet()) {
String value = entry.getValue();
if (StringUtils.isNotBlank(application.getJobName())) {
value =
value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", application.getJobName());
}
if (application.getId() != null) {
value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", application.getId().toString());
}
flinkConfig.setProperty(entry.getKey(), value);
}
return flinkConfig;
}
What changes were proposed in this pull request
Issue Number: close #3463
Brief change log
Support the ${jobName} variable in Flink configuration files. This makes it easier to differentiate the files for each job, including savepoints, checkpoints, high availability files, etc.
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts