From 2353abe84b2a60fe2dec0485d638a98e20244004 Mon Sep 17 00:00:00 2001 From: chenjie-sau Date: Tue, 27 Oct 2020 10:35:07 +0800 Subject: [PATCH] KYLIN-4736 Upgrade flink version to 1.11.1 (#1462) --- build/bin/download-flink.sh | 13 ++++++------- .../org/apache/kylin/common/util/HadoopUtil.java | 2 ++ .../src/main/resources/kylin-defaults.properties | 5 ++--- .../engine/flink/FlinkOnYarnConfigMapping.java | 16 ++++------------ .../flink/FlinkOnYarnConfigMappingTest.java | 14 +++++++------- pom.xml | 2 +- 6 files changed, 22 insertions(+), 30 deletions(-) diff --git a/build/bin/download-flink.sh b/build/bin/download-flink.sh index 4d2fdb8e594..c118a8ffbc6 100755 --- a/build/bin/download-flink.sh +++ b/build/bin/download-flink.sh @@ -35,12 +35,11 @@ if [[ `uname -a` =~ "Darwin" ]]; then alias md5cmd="md5 -q" fi -flink_version="1.9.2" +flink_version="1.11.1" scala_version="2.11" -flink_shaded_version="10.0" -hadoop_version="2.7.5" -flink_pkg_md5="0718a04fe0a641cc5f5368124a4c54a5" -flink_shaded_hadoop_md5="4287a314bfb09a3dc957cbda3f91d7ca" +flink_shaded_hadoop_version="3.1.1.7.1.1.0-565-9.0" +flink_pkg_md5="3b7aa59b44add1a0625737f6516e0929" +flink_shaded_hadoop_md5="7b78e546dd93f4facd322921f29de1eb" if [ ! -f "flink-${flink_version}-bin-scala_${scala_version}.tgz" ]; then echo "No binary file found, start to download package to ${flink_package_dir}" @@ -53,8 +52,8 @@ else fi fi -flink_shaded_hadoop_jar="flink-shaded-hadoop-2-uber-${hadoop_version}-${flink_shaded_version}.jar" -flink_shaded_hadoop_path="https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${hadoop_version}-${flink_shaded_version}/${flink_shaded_hadoop_jar}" +flink_shaded_hadoop_jar="flink-shaded-hadoop-3-uber-${flink_shaded_hadoop_version}.jar" +flink_shaded_hadoop_path="https://repository.cloudera.com/artifactory/libs-release-local/org/apache/flink/flink-shaded-hadoop-3-uber/${flink_shaded_hadoop_version}/${flink_shaded_hadoop_jar}" if [ ! -f $flink_shaded_hadoop_jar ]; then echo "Start to download $flink_shaded_hadoop_jar" diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index 0f6da043cda..26d0ea39192 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -64,12 +64,14 @@ public static Configuration getCurrentConfiguration() { return conf; } Configuration conf = hadoopConfig.get(); + conf.set("fs.hdfs.impl.disable.cache", "true"); return conf; } public static Configuration healSickConfig(Configuration conf) { // https://issues.apache.org/jira/browse/KYLIN-3064 conf.set("yarn.timeline-service.enabled", "false"); + conf.set("fs.hdfs.impl.disable.cache", "true"); return conf; } diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index c16419a9542..ebf1cd27bc9 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -352,10 +352,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2 ### FLINK ENGINE CONFIGS ### ## Flink conf (default is in flink/conf/flink-conf.yaml) -kylin.engine.flink-conf.jobmanager.heap.size=2G -kylin.engine.flink-conf.taskmanager.heap.size=4G +kylin.engine.flink-conf.jobmanager.memory.process.size=2G +kylin.engine.flink-conf.taskmanager.memory.process.size=4G kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1 -kylin.engine.flink-conf.taskmanager.memory.preallocate=false kylin.engine.flink-conf.job.parallelism=1 kylin.engine.flink-conf.program.enableObjectReuse=false kylin.engine.flink-conf.yarn.queue= diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java index 154d4e2d389..a3d2a65aac4 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java @@ -14,13 +14,14 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.flink; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.FallbackKey; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.MemorySize; import java.util.HashMap; import java.util.Iterator; @@ -38,7 +39,7 @@ public class FlinkOnYarnConfigMapping { flinkOnYarnConfigMap = new HashMap<>(); //mapping job manager heap size -> -yjm - ConfigOption jmHeapSizeOption = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY; + ConfigOption jmHeapSizeOption = JobManagerOptions.TOTAL_PROCESS_MEMORY; flinkOnYarnConfigMap.put(jmHeapSizeOption.key(), "-yjm"); if (jmHeapSizeOption.hasFallbackKeys()) { Iterator deprecatedKeyIterator = jmHeapSizeOption.fallbackKeys().iterator(); @@ -48,7 +49,7 @@ public class FlinkOnYarnConfigMapping { } //mapping task manager heap size -> -ytm - ConfigOption tmHeapSizeOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY; + ConfigOption tmHeapSizeOption = TaskManagerOptions.TOTAL_PROCESS_MEMORY; flinkOnYarnConfigMap.put(tmHeapSizeOption.key(), "-ytm"); if (tmHeapSizeOption.hasFallbackKeys()) { Iterator deprecatedKeyIterator = tmHeapSizeOption.fallbackKeys().iterator(); @@ -66,15 +67,6 @@ public class FlinkOnYarnConfigMapping { } } - ConfigOption tmMemoryPreallocate = TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE; - flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD taskmanager.memory.preallocate"); - if (taskSlotNumOption.hasFallbackKeys()) { - Iterator deprecatedKeyIterator = tmMemoryPreallocate.fallbackKeys().iterator(); - while (deprecatedKeyIterator.hasNext()) { - flinkOnYarnConfigMap.put(deprecatedKeyIterator.next().getKey(), "-yD taskmanager.memory.preallocate"); - } - } - //config options do not have mapping with config file key flinkOnYarnConfigMap.put("yarn.queue", "-yqu"); flinkOnYarnConfigMap.put("yarn.nodelabel", "-ynl"); diff --git a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java index 3cb6f28bd39..ca301db3570 100644 --- a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java +++ b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.flink; import org.apache.flink.configuration.FallbackKey; @@ -40,10 +40,10 @@ public void testFlinkOnYarnJMMemOption() { String flinkConfigOption = entry.getKey(); boolean matchedAnyOne; - matchedAnyOne = flinkConfigOption.equals(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()); + matchedAnyOne = flinkConfigOption.equals(JobManagerOptions.TOTAL_PROCESS_MEMORY.key()); if (!matchedAnyOne) { - if (JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.hasFallbackKeys()) { - Iterator deprecatedKeyIterator = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY + if (JobManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) { + Iterator deprecatedKeyIterator = JobManagerOptions.TOTAL_PROCESS_MEMORY .fallbackKeys().iterator(); while (deprecatedKeyIterator.hasNext()) { matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey()); @@ -65,10 +65,10 @@ public void testFlinkOnYarnTMMemOption() { String flinkConfigOption = entry.getKey(); boolean matchedAnyOne; - matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key()); + matchedAnyOne = flinkConfigOption.equals(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key()); if (!matchedAnyOne) { - if (TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.hasFallbackKeys()) { - Iterator deprecatedKeyIterator = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY + if (TaskManagerOptions.TOTAL_PROCESS_MEMORY.hasFallbackKeys()) { + Iterator deprecatedKeyIterator = TaskManagerOptions.TOTAL_PROCESS_MEMORY .fallbackKeys().iterator(); while (deprecatedKeyIterator.hasNext()) { matchedAnyOne = matchedAnyOne && flinkConfigOption.equals(deprecatedKeyIterator.next().getKey()); diff --git a/pom.xml b/pom.xml index 1c7444e43e3..ea348dc6e70 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 4.0.0 - 1.9.2 + 1.11.1 5.1.8