From 38733f8fe63751826b3d99e5ab79f7e795e5c166 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Mon, 5 Aug 2024 10:09:36 -0700 Subject: [PATCH] Flink: adjust code for the new 1.20 module. also fixed the bug of missing jmh in the 1.19 module. --- .github/workflows/flink-ci.yml | 9 +---- flink/build.gradle | 9 +++-- .../shuffle/MapRangePartitionerBenchmark.java | 10 ++++-- flink/v1.20/build.gradle | 36 +++++++++---------- .../shuffle/MapRangePartitionerBenchmark.java | 10 ++++-- ...estIcebergSpeculativeExecutionSupport.java | 2 +- .../iceberg/flink/util/TestFlinkPackage.java | 2 +- gradle.properties | 4 +-- gradle/libs.versions.toml | 24 ++++++------- jmh.gradle | 12 +++---- settings.gradle | 18 +++++----- 11 files changed, 70 insertions(+), 66 deletions(-) diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml index 3df36e2be86a..370375783cc2 100644 --- a/.github/workflows/flink-ci.yml +++ b/.github/workflows/flink-ci.yml @@ -74,14 +74,7 @@ jobs: strategy: matrix: jvm: [11, 17, 21] - flink: ['1.17', '1.18', '1.19'] - exclude: - # Flink 1.17 does not support Java 17. - - jvm: 17 - flink: '1.17' - # Flink 1.17 does not support Java 21. - - jvm: 21 - flink: '1.17' + flink: ['1.18', '1.19', '1.20'] env: SPARK_LOCAL_IP: localhost steps: diff --git a/flink/build.gradle b/flink/build.gradle index f049ff69b059..17ed630cc235 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -19,11 +19,6 @@ def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",") - -if (flinkVersions.contains("1.17")) { - apply from: file("$projectDir/v1.17/build.gradle") -} - if (flinkVersions.contains("1.18")) { apply from: file("$projectDir/v1.18/build.gradle") } @@ -31,3 +26,7 @@ if (flinkVersions.contains("1.18")) { if (flinkVersions.contains("1.19")) { apply from: file("$projectDir/v1.19/build.gradle") } + +if (flinkVersions.contains("1.20")) { + apply from: file("$projectDir/v1.20/build.gradle") +} \ No newline at end of file diff --git a/flink/v1.19/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java b/flink/v1.19/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java index c3917165753d..007b423e592a 100644 --- a/flink/v1.19/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java +++ b/flink/v1.19/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.shuffle; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -27,6 +28,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SortKey; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark { Types.NestedField.required(9, "name9", Types.StringType.get())); private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + private static final Comparator SORT_ORDER_COMPARTOR = + SortOrderComparators.forSchema(SCHEMA, SORT_ORDER); private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER); private MapRangePartitioner partitioner; @@ -82,10 +87,11 @@ public void setupBenchmark() { mapStatistics.put(sortKey, weight); }); - MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics); + MapAssignment mapAssignment = + MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR); this.partitioner = new MapRangePartitioner( - SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2); + SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment); List keys = Lists.newArrayList(weights.keySet().iterator()); long[] weightsCDF = new long[keys.size()]; diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle index 392a1cb124f0..f2e1fb51a1f4 100644 --- a/flink/v1.20/build.gradle +++ b/flink/v1.20/build.gradle @@ -17,7 +17,7 @@ * under the License. */ -String flinkMajorVersion = '1.19' +String flinkMajorVersion = '1.20' String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') - compileOnly libs.flink119.avro + compileOnly libs.flink120.avro // for dropwizard histogram metrics implementation - compileOnly libs.flink119.metrics.dropwizard - compileOnly libs.flink119.streaming.java - compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests" - compileOnly libs.flink119.table.api.java.bridge - compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}" - compileOnly libs.flink119.connector.base - compileOnly libs.flink119.connector.files + compileOnly libs.flink120.metrics.dropwizard + compileOnly libs.flink120.streaming.java + compileOnly "${libs.flink120.streaming.java.get().module}:${libs.flink120.streaming.java.get().getVersion()}:tests" + compileOnly libs.flink120.table.api.java.bridge + compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}" + compileOnly libs.flink120.connector.base + compileOnly libs.flink120.connector.files compileOnly libs.hadoop2.hdfs compileOnly libs.hadoop2.common @@ -68,13 +68,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation libs.datasketches - testImplementation libs.flink119.connector.test.utils - testImplementation libs.flink119.core - testImplementation libs.flink119.runtime - testImplementation(libs.flink119.test.utilsjunit) { + testImplementation libs.flink120.connector.test.utils + testImplementation libs.flink120.core + testImplementation libs.flink120.runtime + testImplementation(libs.flink120.test.utilsjunit) { exclude group: 'junit' } - testImplementation(libs.flink119.test.utils) { + testImplementation(libs.flink120.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } @@ -168,7 +168,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { } // for dropwizard histogram metrics implementation - implementation libs.flink119.metrics.dropwizard + implementation libs.flink120.metrics.dropwizard // for integration testing with the flink-runtime-jar // all of those dependencies are required because the integration test extends FlinkTestBase @@ -178,13 +178,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts") integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') - integrationImplementation(libs.flink119.test.utils) { + integrationImplementation(libs.flink120.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } - integrationImplementation libs.flink119.table.api.java.bridge - integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}" + integrationImplementation libs.flink120.table.api.java.bridge + integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}" integrationImplementation libs.hadoop2.common integrationImplementation libs.hadoop2.hdfs diff --git a/flink/v1.20/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java b/flink/v1.20/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java index c3917165753d..007b423e592a 100644 --- a/flink/v1.20/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java +++ b/flink/v1.20/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink.shuffle; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -27,6 +28,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SortKey; import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark { Types.NestedField.required(9, "name9", Types.StringType.get())); private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + private static final Comparator SORT_ORDER_COMPARTOR = + SortOrderComparators.forSchema(SCHEMA, SORT_ORDER); private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER); private MapRangePartitioner partitioner; @@ -82,10 +87,11 @@ public void setupBenchmark() { mapStatistics.put(sortKey, weight); }); - MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics); + MapAssignment mapAssignment = + MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR); this.partitioner = new MapRangePartitioner( - SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2); + SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment); List keys = Lists.newArrayList(weights.keySet().iterator()); long[] weightsCDF = new long[keys.size()]; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index 41b023b93617..992b712d9d69 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -165,7 +165,7 @@ private static Configuration configure() { Configuration configuration = new Configuration(); configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); configuration.set(RestOptions.BIND_PORT, "0"); - configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L); + configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofSeconds(5)); // Use FLIP-27 source configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 4ba4f9d983dc..65f21f7d050c 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -29,7 +29,7 @@ public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ @Test public void testVersion() { - assertThat(FlinkPackage.version()).isEqualTo("1.19.0"); + assertThat(FlinkPackage.version()).isEqualTo("1.20.0"); } @Test diff --git a/gradle.properties b/gradle.properties index c6b8dec17bc5..fcbe7d8de012 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,8 +16,8 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.19 -systemProp.knownFlinkVersions=1.17,1.18,1.19 +systemProp.defaultFlinkVersions=1.20 +systemProp.knownFlinkVersions=1.18,1.19,1.20 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 systemProp.defaultSparkVersions=3.5 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 12caeda95407..77e610e885f6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -39,9 +39,9 @@ delta-spark = "3.2.0" esotericsoftware-kryo = "4.0.3" errorprone-annotations = "2.29.2" findbugs-jsr305 = "3.0.2" -flink117 = { strictly = "1.17.2"} flink118 = { strictly = "1.18.1"} flink119 = { strictly = "1.19.0"} +flink120 = { strictly = "1.20.0"} google-libraries-bom = "26.43.0" guava = "33.2.1-jre" hadoop2 = "2.7.3" @@ -108,12 +108,6 @@ datasketches = { module = "org.apache.datasketches:datasketches-java", version.r delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" } errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" } findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" } -flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink117" } -flink117-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink117" } -flink117-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink117" } -flink117-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink117" } -flink117-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink117" } -flink117-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink117" } flink118-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink118" } flink118-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink118" } flink118-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink118" } @@ -126,6 +120,12 @@ flink119-connector-files = { module = "org.apache.flink:flink-connector-files", flink119-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink119" } flink119-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink119" } flink119-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink119" } +flink120-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink120" } +flink120-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink120" } +flink120-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink120" } +flink120-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink120" } +flink120-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink120" } +flink120-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink120" } google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" } guava-guava = { module = "com.google.guava:guava", version.ref = "guava" } hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" } @@ -180,11 +180,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = "delta-spark" } esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" } -flink117-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink117" } -flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink117" } -flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" } -flink117-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink117" } -flink117-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink117" } flink118-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink118" } flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink118" } flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" } @@ -195,6 +190,11 @@ flink119-core = { module = "org.apache.flink:flink-core", version.ref = "flink11 flink119-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink119" } flink119-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink119" } flink119-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink119" } +flink120-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink120" } +flink120-core = { module = "org.apache.flink:flink-core", version.ref = "flink120" } +flink120-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink120" } +flink120-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink120" } +flink120-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink120" } guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" } jakarta-el-api = { module = "jakarta.el:jakarta.el-api", version.ref = "jakarta-el-api" } jakarta-servlet = {module = "jakarta.servlet:jakarta.servlet-api", version.ref = "jakarta-servlet-api"} diff --git a/jmh.gradle b/jmh.gradle index 5e5e0151219f..a5d8d624270d 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -26,16 +26,16 @@ def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getPro def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") def jmhProjects = [project(":iceberg-core"), project(":iceberg-data")] -if (flinkVersions.contains("1.16")) { - jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.16")) +if (flinkVersions.contains("1.18")) { + jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18")) } -if (flinkVersions.contains("1.17")) { - jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.17")) +if (flinkVersions.contains("1.19")) { + jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.19")) } -if (flinkVersions.contains("1.18")) { - jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18")) +if (flinkVersions.contains("1.20")) { + jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.20")) } if (sparkVersions.contains("3.3")) { diff --git a/settings.gradle b/settings.gradle index cdc69b0e2071..1e6d92bf1e1f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -112,15 +112,6 @@ if (!flinkVersions.isEmpty()) { project(':flink').name = 'iceberg-flink' } -if (flinkVersions.contains("1.17")) { - include ":iceberg-flink:flink-1.17" - include ":iceberg-flink:flink-runtime-1.17" - project(":iceberg-flink:flink-1.17").projectDir = file('flink/v1.17/flink') - project(":iceberg-flink:flink-1.17").name = "iceberg-flink-1.17" - project(":iceberg-flink:flink-runtime-1.17").projectDir = file('flink/v1.17/flink-runtime') - project(":iceberg-flink:flink-runtime-1.17").name = "iceberg-flink-runtime-1.17" -} - if (flinkVersions.contains("1.18")) { include ":iceberg-flink:flink-1.18" include ":iceberg-flink:flink-runtime-1.18" @@ -139,6 +130,15 @@ if (flinkVersions.contains("1.19")) { project(":iceberg-flink:flink-runtime-1.19").name = "iceberg-flink-runtime-1.19" } +if (flinkVersions.contains("1.20")) { + include ":iceberg-flink:flink-1.20" + include ":iceberg-flink:flink-runtime-1.20" + project(":iceberg-flink:flink-1.20").projectDir = file('flink/v1.20/flink') + project(":iceberg-flink:flink-1.20").name = "iceberg-flink-1.20" + project(":iceberg-flink:flink-runtime-1.20").projectDir = file('flink/v1.20/flink-runtime') + project(":iceberg-flink:flink-runtime-1.20").name = "iceberg-flink-runtime-1.20" +} + if (sparkVersions.contains("3.3")) { include ":iceberg-spark:spark-3.3_${scalaVersion}" include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}"