Skip to content

Commit

Permalink
Flink: adjust code for the new 1.20 module.
Browse files Browse the repository at this point in the history
also fixed the bug of missing jmh in the 1.19 module.
  • Loading branch information
stevenzwu committed Aug 6, 2024
1 parent 0d8f2c4 commit 38733f8
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 66 deletions.
9 changes: 1 addition & 8 deletions .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ jobs:
strategy:
matrix:
jvm: [11, 17, 21]
flink: ['1.17', '1.18', '1.19']
exclude:
# Flink 1.17 does not support Java 17.
- jvm: 17
flink: '1.17'
# Flink 1.17 does not support Java 21.
- jvm: 21
flink: '1.17'
flink: ['1.18', '1.19', '1.20']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
9 changes: 4 additions & 5 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")


if (flinkVersions.contains("1.17")) {
apply from: file("$projectDir/v1.17/build.gradle")
}

if (flinkVersions.contains("1.18")) {
apply from: file("$projectDir/v1.18/build.gradle")
}

if (flinkVersions.contains("1.19")) {
apply from: file("$projectDir/v1.19/build.gradle")
}

if (flinkVersions.contains("1.20")) {
apply from: file("$projectDir/v1.20/build.gradle")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -27,6 +28,8 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark {
Types.NestedField.required(9, "name9", Types.StringType.get()));

private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
private static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
SortOrderComparators.forSchema(SCHEMA, SORT_ORDER);
private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);

private MapRangePartitioner partitioner;
Expand All @@ -82,10 +87,11 @@ public void setupBenchmark() {
mapStatistics.put(sortKey, weight);
});

MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics);
MapAssignment mapAssignment =
MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR);
this.partitioner =
new MapRangePartitioner(
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2);
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment);

List<Integer> keys = Lists.newArrayList(weights.keySet().iterator());
long[] weightsCDF = new long[keys.size()];
Expand Down
36 changes: 18 additions & 18 deletions flink/v1.20/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

String flinkMajorVersion = '1.19'
String flinkMajorVersion = '1.20'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
Expand All @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly libs.flink119.avro
compileOnly libs.flink120.avro
// for dropwizard histogram metrics implementation
compileOnly libs.flink119.metrics.dropwizard
compileOnly libs.flink119.streaming.java
compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink119.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
compileOnly libs.flink119.connector.base
compileOnly libs.flink119.connector.files
compileOnly libs.flink120.metrics.dropwizard
compileOnly libs.flink120.streaming.java
compileOnly "${libs.flink120.streaming.java.get().module}:${libs.flink120.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink120.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"
compileOnly libs.flink120.connector.base
compileOnly libs.flink120.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
Expand Down Expand Up @@ -68,13 +68,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {

implementation libs.datasketches

testImplementation libs.flink119.connector.test.utils
testImplementation libs.flink119.core
testImplementation libs.flink119.runtime
testImplementation(libs.flink119.test.utilsjunit) {
testImplementation libs.flink120.connector.test.utils
testImplementation libs.flink120.core
testImplementation libs.flink120.runtime
testImplementation(libs.flink120.test.utilsjunit) {
exclude group: 'junit'
}
testImplementation(libs.flink119.test.utils) {
testImplementation(libs.flink120.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}
Expand Down Expand Up @@ -168,7 +168,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
}

// for dropwizard histogram metrics implementation
implementation libs.flink119.metrics.dropwizard
implementation libs.flink120.metrics.dropwizard

// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test extends FlinkTestBase
Expand All @@ -178,13 +178,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation(libs.flink119.test.utils) {
integrationImplementation(libs.flink120.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}

integrationImplementation libs.flink119.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
integrationImplementation libs.flink120.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink.shuffle;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -27,6 +28,8 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.SortOrderComparators;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -66,6 +69,8 @@ public class MapRangePartitionerBenchmark {
Types.NestedField.required(9, "name9", Types.StringType.get()));

private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
private static final Comparator<StructLike> SORT_ORDER_COMPARTOR =
SortOrderComparators.forSchema(SCHEMA, SORT_ORDER);
private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER);

private MapRangePartitioner partitioner;
Expand All @@ -82,10 +87,11 @@ public void setupBenchmark() {
mapStatistics.put(sortKey, weight);
});

MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics);
MapAssignment mapAssignment =
MapAssignment.fromKeyFrequency(2, mapStatistics, 0.0, SORT_ORDER_COMPARTOR);
this.partitioner =
new MapRangePartitioner(
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2);
SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), mapAssignment);

List<Integer> keys = Lists.newArrayList(weights.keySet().iterator());
long[] weightsCDF = new long[keys.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private static Configuration configure() {
Configuration configuration = new Configuration();
configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
configuration.set(RestOptions.BIND_PORT, "0");
configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofSeconds(5));

// Use FLIP-27 source
configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestFlinkPackage {
/** This unit test would need to be adjusted as new Flink version is supported. */
@Test
public void testVersion() {
assertThat(FlinkPackage.version()).isEqualTo("1.19.0");
assertThat(FlinkPackage.version()).isEqualTo("1.20.0");
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhJsonOutputPath=build/reports/jmh/results.json
jmhIncludeRegex=.*
systemProp.defaultFlinkVersions=1.19
systemProp.knownFlinkVersions=1.17,1.18,1.19
systemProp.defaultFlinkVersions=1.20
systemProp.knownFlinkVersions=1.18,1.19,1.20
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.5
Expand Down
24 changes: 12 additions & 12 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ delta-spark = "3.2.0"
esotericsoftware-kryo = "4.0.3"
errorprone-annotations = "2.29.2"
findbugs-jsr305 = "3.0.2"
flink117 = { strictly = "1.17.2"}
flink118 = { strictly = "1.18.1"}
flink119 = { strictly = "1.19.0"}
flink120 = { strictly = "1.20.0"}
google-libraries-bom = "26.43.0"
guava = "33.2.1-jre"
hadoop2 = "2.7.3"
Expand Down Expand Up @@ -108,12 +108,6 @@ datasketches = { module = "org.apache.datasketches:datasketches-java", version.r
delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" }
errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" }
findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" }
flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink117" }
flink117-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink117" }
flink117-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink117" }
flink117-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink117" }
flink117-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink117" }
flink117-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink117" }
flink118-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink118" }
flink118-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink118" }
flink118-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink118" }
Expand All @@ -126,6 +120,12 @@ flink119-connector-files = { module = "org.apache.flink:flink-connector-files",
flink119-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink119" }
flink119-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink119" }
flink119-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink119" }
flink120-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink120" }
flink120-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink120" }
flink120-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink120" }
flink120-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink120" }
flink120-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink120" }
flink120-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink120" }
google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" }
guava-guava = { module = "com.google.guava:guava", version.ref = "guava" }
hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" }
Expand Down Expand Up @@ -180,11 +180,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = "delta-spark" }
esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" }
flink117-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink117" }
flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink117" }
flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" }
flink117-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink117" }
flink117-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink117" }
flink118-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink118" }
flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink118" }
flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" }
Expand All @@ -195,6 +190,11 @@ flink119-core = { module = "org.apache.flink:flink-core", version.ref = "flink11
flink119-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink119" }
flink119-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink119" }
flink119-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink119" }
flink120-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink120" }
flink120-core = { module = "org.apache.flink:flink-core", version.ref = "flink120" }
flink120-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink120" }
flink120-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink120" }
flink120-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink120" }
guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" }
jakarta-el-api = { module = "jakarta.el:jakarta.el-api", version.ref = "jakarta-el-api" }
jakarta-servlet = {module = "jakarta.servlet:jakarta.servlet-api", version.ref = "jakarta-servlet-api"}
Expand Down
12 changes: 6 additions & 6 deletions jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getPro
def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
def jmhProjects = [project(":iceberg-core"), project(":iceberg-data")]

if (flinkVersions.contains("1.16")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.16"))
if (flinkVersions.contains("1.18")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18"))
}

if (flinkVersions.contains("1.17")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.17"))
if (flinkVersions.contains("1.19")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.19"))
}

if (flinkVersions.contains("1.18")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.18"))
if (flinkVersions.contains("1.20")) {
jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.20"))
}

if (sparkVersions.contains("3.3")) {
Expand Down
18 changes: 9 additions & 9 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,6 @@ if (!flinkVersions.isEmpty()) {
project(':flink').name = 'iceberg-flink'
}

if (flinkVersions.contains("1.17")) {
include ":iceberg-flink:flink-1.17"
include ":iceberg-flink:flink-runtime-1.17"
project(":iceberg-flink:flink-1.17").projectDir = file('flink/v1.17/flink')
project(":iceberg-flink:flink-1.17").name = "iceberg-flink-1.17"
project(":iceberg-flink:flink-runtime-1.17").projectDir = file('flink/v1.17/flink-runtime')
project(":iceberg-flink:flink-runtime-1.17").name = "iceberg-flink-runtime-1.17"
}

if (flinkVersions.contains("1.18")) {
include ":iceberg-flink:flink-1.18"
include ":iceberg-flink:flink-runtime-1.18"
Expand All @@ -139,6 +130,15 @@ if (flinkVersions.contains("1.19")) {
project(":iceberg-flink:flink-runtime-1.19").name = "iceberg-flink-runtime-1.19"
}

if (flinkVersions.contains("1.20")) {
include ":iceberg-flink:flink-1.20"
include ":iceberg-flink:flink-runtime-1.20"
project(":iceberg-flink:flink-1.20").projectDir = file('flink/v1.20/flink')
project(":iceberg-flink:flink-1.20").name = "iceberg-flink-1.20"
project(":iceberg-flink:flink-runtime-1.20").projectDir = file('flink/v1.20/flink-runtime')
project(":iceberg-flink:flink-runtime-1.20").name = "iceberg-flink-runtime-1.20"
}

if (sparkVersions.contains("3.3")) {
include ":iceberg-spark:spark-3.3_${scalaVersion}"
include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}"
Expand Down

0 comments on commit 38733f8

Please sign in to comment.