Skip to content

Commit

Permalink
[AMORO-3087]: Optimizer Support Flink 1.20 (#3087) (#3090)
Browse files Browse the repository at this point in the history
* [AMORO-3087]: Optimizer Support Flink 1.20 (#3087)

* fix docs

* fix

* docker images change flink version to 1.20

---------

Co-authored-by: ConradJam <[email protected]>
  • Loading branch information
czy006 and ConradJam authored Aug 14, 2024
1 parent edc672e commit 4f51018
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 52 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/docker-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
if: ${{ startsWith(github.repository, 'apache/') }}
strategy:
matrix:
flink: [ "1.14.6", "1.18.1" ]
flink: [ "1.14.6", "1.20.0" ]
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
Expand All @@ -129,11 +129,11 @@ jobs:
tags: |
type=ref,event=branch,enable=${{ matrix.flink == '1.14.6' }},suffix=-snapshot
type=ref,event=branch,enable=${{ matrix.flink == '1.14.6' }},suffix=-snapshot-flink1.14
type=ref,event=branch,enable=${{ matrix.flink == '1.18.1' }},suffix=-snapshot-flink1.18
type=ref,event=branch,enable=${{ matrix.flink == '1.20.0' }},suffix=-snapshot-flink1.20
type=raw,enable=${{ matrix.hadoop == '1.14.6' && startsWith(github.ref, 'refs/tags/v') }},value=latest
type=semver,enable=${{ matrix.flink == '1.14.6' }},pattern={{version}}
type=semver,enable=${{ matrix.flink == '1.14.6' }},pattern={{version}}, suffix=-flink1.14
type=semver,enable=${{ matrix.flink == '1.18.1' }},pattern={{version}}, suffix=-flink1.18
type=semver,enable=${{ matrix.flink == '1.20.0' }},pattern={{version}}, suffix=-flink1.20
- name: Print tags
run: echo '${{ steps.meta.outputs.tags }}'
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Amoro is built using Maven with JDK 8 and JDK 17(only for `amoro-mixed-format/am
* Build and skip dashboard: `mvn clean package -Pskip-dashboard-build`
* Build and disable disk storage, RocksDB will NOT be introduced to avoid memory overflow: `mvn clean package -DskipTests -Pno-extented-disk-storage`
* Build with hadoop 2.x(the default is 3.x) dependencies: `mvn clean package -DskipTests -Phadoop2`
* Specify Flink version for Flink optimizer(the default is 1.18.1): `mvn clean package -DskipTests -Dflink-optimizer.flink-version=1.15.4`
* Specify Flink version for Flink optimizer(the default is 1.20.0): `mvn clean package -DskipTests -Dflink-optimizer.flink-version=1.20.0`
* If the version of Flink is below 1.15.0, you also need to add the `-Pflink-optimizer-pre-1.15` parameter: `mvn clean package -DskipTests -Pflink-optimizer-pre-1.15 -Dflink-optimizer.flink-version=1.14.6`
* Specify Spark version for Spark optimizer(the default is 3.3.3): `mvn clean package -DskipTests -Dspark-optimizer.spark-version=3.3.3`
* Build `amoro-mixed-format-trino` module under JDK 17: `mvn clean package -DskipTests -Pformat-mixed-format-trino,build-mixed-format-trino -pl 'amoro-mixed-format/amoro-mixed-format-trino' -am`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.configuration.YamlParserUtils;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
Expand All @@ -60,10 +61,13 @@
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -79,26 +83,16 @@ public class FlinkOptimizerContainer extends AbstractResourceContainer {

public static final String FLINK_HOME_PROPERTY = "flink-home";
public static final String FLINK_CONFIG_PATH = "/conf";
public static final String FLINK_CONFIG_YAML = "/flink-conf.yaml";
public static final String LEGACY_FLINK_CONFIG_YAML = "/flink-conf.yaml";
// flink version >= 1.20 use it first
public static final String FLINK_CONFIG_YAML = "/config.yaml";
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
public static final String FLINK_CLIENT_TIMEOUT_SECOND = "flink-client-timeout-second";

private static final String DEFAULT_JOB_URI = "/plugin/optimizer/flink/optimizer-job.jar";
private static final String FLINK_JOB_MAIN_CLASS =
"org.apache.amoro.optimizer.flink.FlinkOptimizer";

/**
* This will be removed in 0.7.0, using flink properties
* `flink-conf.taskmanager.memory.process.size`.
*/
@Deprecated public static final String TASK_MANAGER_MEMORY_PROPERTY = "taskmanager.memory";

/**
* This will be removed in 0.7.0, using flink properties
* `flink-conf.jobmanager.memory.process.size`.
*/
@Deprecated public static final String JOB_MANAGER_MEMORY_PROPERTY = "jobmanager.memory";

public static final String FLINK_RUN_TARGET = "target";
public static final String FLINK_JOB_URI = "job-uri";

Expand Down Expand Up @@ -228,16 +222,10 @@ protected String buildOptimizerStartupArgsString(Resource resource) {

long jobManagerMemory =
getMemorySizeValue(
properties,
resourceFlinkConf,
JOB_MANAGER_MEMORY_PROPERTY,
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
properties, resourceFlinkConf, FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY);
long taskManagerMemory =
getMemorySizeValue(
properties,
resourceFlinkConf,
TASK_MANAGER_MEMORY_PROPERTY,
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
properties, resourceFlinkConf, FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);

resourceFlinkConf.putToOptions(
FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory + "m");
Expand Down Expand Up @@ -266,15 +254,62 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
jobArgs);
}

@VisibleForTesting
protected Map<String, String> loadFlinkConfigForYAML(URL path) {
this.flinkConfDir = Paths.get(path.getPath()).getParent().toString();
return loadFlinkConfig();
}

/**
* get flink config with config.yaml or flink-conf.yaml see <a
* href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#flink-configuration-file"></a>
*
* @return flink config map
*/
private Map<String, String> loadFlinkConfig() {
try {
return new Yaml().load(Files.newInputStream(Paths.get(flinkConfDir + FLINK_CONFIG_YAML)));
} catch (IOException e) {
Path flinkConfPath = Paths.get(flinkConfDir + FLINK_CONFIG_YAML);
if (!Files.exists(flinkConfPath, LinkOption.NOFOLLOW_LINKS)) {
flinkConfPath = Paths.get(flinkConfDir + LEGACY_FLINK_CONFIG_YAML);
return new Yaml().load(Files.newInputStream(flinkConfPath));
}
Map<String, Object> configDocument =
YamlParserUtils.loadYamlFile(new File(flinkConfPath.toUri()));
return Maps.transformValues(
flatten(configDocument, ""), value -> value == null ? null : value.toString());
} catch (Exception e) {
LOG.error("load flink conf yaml failed: {}", e.getMessage());
return Collections.emptyMap();
}
}

/**
* Copy from flink 1.20 GlobalConfiguration.flatten Utils
*
* @param config
* @param keyPrefix
* @return
*/
private static Map<String, Object> flatten(Map<String, Object> config, String keyPrefix) {
final Map<String, Object> flattenedMap = new HashMap<>();
config.forEach(
(key, value) -> {
String flattenedKey = keyPrefix + key;
if (value instanceof Map) {
Map<String, Object> e = (Map<String, Object>) value;
flattenedMap.putAll(flatten(e, flattenedKey + "."));
} else {
if (value instanceof List) {
flattenedMap.put(flattenedKey, YamlParserUtils.toYAMLString(value));
} else {
flattenedMap.put(flattenedKey, value);
}
}
});

return flattenedMap;
}

private void addKubernetesProperties(Resource resource, FlinkConf flinkConf) {
String clusterId = kubernetesClusterId(resource);
flinkConf.putToOptions(FlinkConfKeys.KUBERNETES_CLUSTER_ID, clusterId);
Expand All @@ -297,16 +332,12 @@ private void addYarnProperties(FlinkConf flinkConf) {

/**
* get jobManager and taskManager memory. An example of using Jobmanager memory parameters is as
* follows: jobmanager.memory: 1024 flink-conf.jobmanager.memory.process.size: 1024M
* flink-conf.yaml Prioritize from high to low.
* flink-conf.jobmanager.memory.process.size: 1024M flink-conf.yaml Prioritize from high to low.
*/
@VisibleForTesting
protected long getMemorySizeValue(
Map<String, String> resourceProperties,
FlinkConf conf,
String resourcePropertyKey,
String flinkConfKey) {
String value = resourceProperties.get(resourcePropertyKey);
Map<String, String> resourceProperties, FlinkConf conf, String flinkConfKey) {
String value = resourceProperties.get(flinkConfKey);
if (value == null) {
value = conf.configValue(flinkConfKey);
}
Expand Down Expand Up @@ -532,7 +563,7 @@ protected JobID runJar(String jarId, Configuration configuration, Resource resou
JobID jobID = JobID.generate();
JarRunRequestBody runRequestBody =
new JarRunRequestBody(
FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, RestoreMode.DEFAULT, null);
FLINK_JOB_MAIN_CLASS, args, null, null, jobID, true, null, RestoreMode.NO_CLAIM, null);
LOG.info("Submitting job: {} to session cluster, args: {}", jobID, args);
try (RestClusterClient<String> restClusterClient =
FlinkClientUtil.getRestClusterClient(configuration)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import org.junit.Assert;
import org.junit.Test;

import java.net.URL;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class TestFlinkOptimizerContainer {
FlinkOptimizerContainer container = new FlinkOptimizerContainer();
Expand Down Expand Up @@ -52,6 +55,28 @@ public void testParseMemorySize() {
Assert.assertEquals(0, container.parseMemorySize("100kb"));
}

@Test
public void testReadFlinkConfigFile() {
ClassLoader classLoader = getClass().getClassLoader();
URL flinkConfResourceUrl = classLoader.getResource("flink-conf.yaml");
Assert.assertEquals(
Paths.get(Objects.requireNonNull(flinkConfResourceUrl).getPath()).getFileName().toString(),
"flink-conf.yaml");
URL newFlinkConfResourceUrl = classLoader.getResource("config.yaml");
Assert.assertEquals(
Paths.get(Objects.requireNonNull(newFlinkConfResourceUrl).getPath())
.getFileName()
.toString(),
"config.yaml");
Map<String, String> flinkConfig = container.loadFlinkConfigForYAML(newFlinkConfResourceUrl);
Assert.assertEquals(
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY),
"1728m");
Assert.assertEquals(
flinkConfig.get(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY),
"1600m");
}

@Test
public void testBuildFlinkOptions() {
Map<String, String> containerProperties = Maps.newHashMap(this.containerProperties);
Expand All @@ -78,20 +103,20 @@ public void testBuildFlinkOptions() {
@Test
public void testGetMemorySizeValue() {
HashMap<String, String> prop = new HashMap<>();
prop.put("taskmanager.memory", "100");
prop.put("jobmanager.memory", "100");
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, "100");
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, "100");

FlinkOptimizerContainer.FlinkConf conf =
FlinkOptimizerContainer.FlinkConf.buildFor(prop, Maps.newHashMap()).build();

Assert.assertEquals(
100L,
container.getMemorySizeValue(
prop, conf, "taskmanager.memory", "taskmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
100L,
container.getMemorySizeValue(
prop, conf, "jobmanager.memory", "jobmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

Map<String, String> containerProperties = Maps.newHashMap();
containerProperties.put("flink-conf.jobmanager.memory.process.size", "200 M");
Expand All @@ -101,36 +126,36 @@ public void testGetMemorySizeValue() {
Assert.assertEquals(
200L,
container.getMemorySizeValue(
prop, conf, "taskmanager.memory", "taskmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
200L,
container.getMemorySizeValue(
prop, conf, "jobmanager.memory", "jobmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

prop.clear();
containerProperties = Maps.newHashMap();
conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, containerProperties).build();

prop.put("taskmanager.memory", "300 M");
prop.put("jobmanager.memory", "300");
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, "300 M");
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, "300");
Assert.assertEquals(
300L,
container.getMemorySizeValue(
prop, conf, "taskmanager.memory", "taskmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
300L,
container.getMemorySizeValue(
prop, conf, "jobmanager.memory", "jobmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));

conf = FlinkOptimizerContainer.FlinkConf.buildFor(Maps.newHashMap(), Maps.newHashMap()).build();
prop.clear();
Assert.assertEquals(
0L,
container.getMemorySizeValue(
prop, conf, "taskmanager.memory", "taskmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
0L,
container.getMemorySizeValue(
prop, conf, "jobmanager.memory", "jobmanager.memory.process.size"));
prop, conf, FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
}
}
39 changes: 39 additions & 0 deletions amoro-ams/amoro-ams-server/src/test/resources/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

jobmanager:
bind-host: localhost
rpc:
address: localhost
port: 6123
memory:
process:
size: 1600m
execution:
failover-strategy: region

taskmanager:
bind-host: localhost
host: localhost
numberOfTaskSlots: 1
memory:
process:
size: 1728m

parallelism:
default: 1
32 changes: 32 additions & 0 deletions amoro-ams/amoro-ams-server/src/test/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.bind-host: localhost
jobmanager.memory.process.size: 1600m

taskmanager.bind-host: localhost
taskmanager.host: localhost
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2

jobmanager.execution.failover-strategy: region
rest.address: localhost
rest.bind-address: localhost
Loading

0 comments on commit 4f51018

Please sign in to comment.