From 95c39b83596f5bd630219379cf6ebbde2d073dda Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 30 Aug 2022 16:03:11 -0500 Subject: [PATCH] [SPARK-40256][BUILD][K8S] Switch base image from openjdk to eclipse-temurin ### What changes were proposed in this pull request? This PR switchs the base image from [`openjdk`](https://hub.docker.com/_/openjdk) to [`eclipse-temurin`](https://hub.docker.com/_/eclipse-temurin) (original openjdk). The core change is: the OS of base image changes `debian-bullseye` to `ubuntu-focal` (based on debian bullseye). ### Why are the changes needed? - According to https://github.com/docker-library/openjdk/issues/505 and https://github.com/docker-library/docs/pull/2162, openjdk:8/11 image is EOL and Eclipse Temurin replaces this, the original openjdk image will `remove the 11 and 8 tags (in October 2022, perhaps)` (we are using it in spark), so we have to switch this before it happens. - The `openjdk` is [not update anymore](https://adoptopenjdk.net/upstream.html) (the last releases were 8u342 and 11.0.16, Eclipse Temurin replace is recommanded by adoptopenjdk) that means even the 8/11 tag is not removed, we still need to switch `openjdk`. - There were [many docker official image](https://github.com/search?q=org%3Adocker-library+temurin&type=code) already switch openjdk to eclipse-temurin. - According the [jvm ecosystem report](https://snyk.io/jvm-ecosystem-report-2021) from https://github.com/docker-library/docs/pull/2162 , AdoptOpenJDK(now donation to eclipse foundation and rename to eclipse temurin) builds of OpenJDK most popular in production. - An ideal long-term solution is that we only choose the jdk version and leave the adaptation of OS to the corresponding openjdk official image (just like eclipse-temurin are suppoort [ubuntu, alpine, centos](https://github.com/adoptium/containers/tree/main/11/jre)) - The alternate solution is we just swith `openjdk` image to `debian-bullseye` with openjdk 11 installation. like: https://github.com/Yikun/spark/pull/163. But it makes spark image **depends on debian OS more**, that means we will diffcult to support the Java version which debian OS doesn't support (such as openjdk-8-jre is not be supported in current debian anymore). For the above reason, I think `eclipse-temurin` is a good choice. ### Does this PR introduce _any_ user-facing change? Yes, the docker images base image changes. ### How was this patch tested? CI passed, I also have a local test on: https://github.com/Yikun/spark/pull/162 Closes #37705 from Yikun/switch-temurin. Authored-by: Yikun Jiang Signed-off-by: Gengliang Wang --- bin/docker-image-tool.sh | 4 +- .../main/resources/error/error-classes.json | 12 +++ dev/.scalafmt.conf | 11 ++- dev/scalafmt | 2 +- pom.xml | 2 +- .../src/main/dockerfiles/spark/Dockerfile | 5 +- .../scripts/setup-integration-test-env.sh | 2 +- .../apache/spark/sql/AnalysisException.scala | 2 +- .../sql/errors/QueryCompilationErrors.scala | 11 +-- .../v2/parquet/ParquetScanBuilder.scala | 26 +++--- .../results/group-by-ordinal.sql.out | 81 ++++++++++++++++--- .../postgreSQL/select_implicit.sql.out | 9 ++- .../postgreSQL/udf-select_implicit.sql.out | 9 ++- 13 files changed, 132 insertions(+), 44 deletions(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index ad31bd1e7b..037e0c70b0 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -269,11 +269,11 @@ Examples: $0 -r docker.io/myrepo -t v2.3.0 push - Build and push Java11-based image with tag "v3.0.0" to docker.io/myrepo - $0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-slim build + $0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-focal build $0 -r docker.io/myrepo -t v3.0.0 push - Build and push Java11-based image for multiple archs to docker.io/myrepo - $0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-slim build + $0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-focal build # Note: buildx, which does cross building, needs to do the push during build # So there is no separate push step with -X diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 816df79e50..df0f887a63 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -136,6 +136,18 @@ "Grouping sets size cannot be greater than " ] }, + "GROUP_BY_POS_OUT_OF_RANGE" : { + "message" : [ + "GROUP BY position is not in select list (valid range is [1, ])." + ], + "sqlState" : "42000" + }, + "GROUP_BY_POS_REFERS_AGG_EXPR" : { + "message" : [ + "GROUP BY refers to an expression that contains an aggregate function. Aggregate functions are not allowed in GROUP BY." + ], + "sqlState" : "42000" + }, "INCOMPARABLE_PIVOT_COLUMN" : { "message" : [ "Invalid pivot column . Pivot columns must be comparable." diff --git a/dev/.scalafmt.conf b/dev/.scalafmt.conf index d2196e601a..202f35df4d 100644 --- a/dev/.scalafmt.conf +++ b/dev/.scalafmt.conf @@ -22,6 +22,13 @@ align.tokens = [] optIn = { configStyleArguments = false } -danglingParentheses = false -docstrings = JavaDoc +danglingParentheses.preset = false +docstrings.style = Asterisk maxColumn = 98 +runner.dialect = scala212 +fileOverride { + "glob:**/src/**/scala-2.13/**.scala" { + runner.dialect = scala213 + } +} +version = 3.5.9 diff --git a/dev/scalafmt b/dev/scalafmt index 3f69bc98f5..56ff75fe7d 100755 --- a/dev/scalafmt +++ b/dev/scalafmt @@ -18,5 +18,5 @@ # VERSION="${@:-2.12}" -./build/mvn -Pscala-$VERSION mvn-scalafmt_$VERSION:format -Dscalafmt.skip=false +./build/mvn -Pscala-$VERSION scalafmt:format -Dscalafmt.skip=false diff --git a/pom.xml b/pom.xml index 6b62ef3393..6aca631722 100644 --- a/pom.xml +++ b/pom.xml @@ -3404,7 +3404,7 @@ org.antipathy mvn-scalafmt_${scala.binary.version} - 1.0.4 + 1.1.1640084764.9f463a9 ${scalafmt.skip} ${scalafmt.skip} diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 9f4e2462f3..fc529afb1b 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # -ARG java_image_tag=11-jre-slim +ARG java_image_tag=11-jre-focal -FROM openjdk:${java_image_tag} +FROM eclipse-temurin:${java_image_tag} ARG spark_uid=185 @@ -28,7 +28,6 @@ ARG spark_uid=185 # docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile . RUN set -ex && \ - sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \ apt-get update && \ ln -s /lib /lib64 && \ apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps net-tools && \ diff --git a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh index d8960349f0..721a60b0ae 100755 --- a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh +++ b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh @@ -103,7 +103,7 @@ then cd $SPARK_INPUT_DIR if [[ $DOCKER_FILE == "N/A" ]]; then - # OpenJDK base-image tag (e.g. 8-jre-slim, 11-jre-slim) + # OpenJDK base-image tag (e.g. 8-jre-focal, 11-jre-focal) JAVA_IMAGE_TAG_BUILD_ARG="-b java_image_tag=$JAVA_IMAGE_TAG" else if [[ $DOCKER_FILE = /* ]]; then diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index 9ab0b223e1..48e1f91990 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -100,7 +100,7 @@ class AnalysisException protected[sql] ( line = origin.line, startPosition = origin.startPosition, errorClass = Some(errorClass), - errorSubClass = Some(errorSubClass), + errorSubClass = Option(errorSubClass), messageParameters = messageParameters) def copy( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 20c3c81b25..7458e201be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -366,14 +366,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def groupByPositionRefersToAggregateFunctionError( index: Int, expr: Expression): Throwable = { - new AnalysisException(s"GROUP BY $index refers to an expression that is or contains " + - "an aggregate function. Aggregate functions are not allowed in GROUP BY, " + - s"but got ${expr.sql}") + new AnalysisException( + errorClass = "GROUP_BY_POS_REFERS_AGG_EXPR", + messageParameters = Array(index.toString, expr.sql)) } def groupByPositionRangeError(index: Int, size: Int): Throwable = { - new AnalysisException(s"GROUP BY position $index is not in select list " + - s"(valid range is [1, $size])") + new AnalysisException( + errorClass = "GROUP_BY_POS_OUT_OF_RANGE", + messageParameters = Array(index.toString, size.toString)) } def generatorNotExpectedError(name: FunctionIdentifier, classCanonicalName: String): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index 39a81e6563..84ad3b1353 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.Aggregation -import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates} import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter} @@ -46,7 +45,13 @@ case class ParquetScanBuilder( sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) } - lazy val pushedParquetFilters = { + private var finalSchema = new StructType() + + private var pushedAggregations = Option.empty[Aggregation] + + override protected val supportsNestedSchemaPruning: Boolean = true + + override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = { val sqlConf = sparkSession.sessionState.conf if (sqlConf.parquetFilterPushDown) { val pushDownDate = sqlConf.parquetFilterPushDownDate @@ -68,25 +73,12 @@ case class ParquetScanBuilder( // The rebase mode doesn't matter here because the filters are used to determine // whether they is convertible. RebaseSpec(LegacyBehaviorPolicy.CORRECTED)) - parquetFilters.convertibleFilters(pushedDataFilters).toArray + parquetFilters.convertibleFilters(dataFilters).toArray } else { Array.empty[Filter] } } - private var finalSchema = new StructType() - - private var pushedAggregations = Option.empty[Aggregation] - - override protected val supportsNestedSchemaPruning: Boolean = true - - override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = dataFilters - - // Note: for Parquet, the actual filter push down happens in [[ParquetPartitionReaderFactory]]. - // It requires the Parquet physical schema to determine whether a filter is convertible. - // All filters that can be converted to Parquet are pushed down. - override def pushedFilters: Array[Predicate] = pushedParquetFilters.map(_.toV2) - override def pushAggregation(aggregation: Aggregation): Boolean = { if (!sparkSession.sessionState.conf.parquetAggregatePushDown) { return false @@ -114,7 +106,7 @@ case class ParquetScanBuilder( finalSchema = readDataSchema() } ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, finalSchema, - readPartitionSchema(), pushedParquetFilters, options, pushedAggregations, + readPartitionSchema(), pushedDataFilters, options, pushedAggregations, partitionFilters, dataFilters) } } diff --git a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out index 0e4ec436b3..10b244c1c4 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by-ordinal.sql.out @@ -92,7 +92,14 @@ select a, b from data group by -1 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position -1 is not in select list (valid range is [1, 2]); line 1 pos 31 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "-1", + "size" : "2" + } +} -- !query @@ -101,7 +108,14 @@ select a, b from data group by 0 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position 0 is not in select list (valid range is [1, 2]); line 1 pos 31 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "0", + "size" : "2" + } +} -- !query @@ -110,7 +124,14 @@ select a, b from data group by 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 31 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "size" : "2" + } +} -- !query @@ -119,7 +140,14 @@ select a, b, sum(b) from data group by 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got sum(data.b) AS `sum(b)`; line 1 pos 39 +{ + "errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "aggExpr" : "sum(data.b) AS `sum(b)`" + } +} -- !query @@ -128,7 +156,14 @@ select a, b, sum(b) + 2 from data group by 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got (sum(data.b) + CAST(2 AS BIGINT)) AS `(sum(b) + 2)`; line 1 pos 43 +{ + "errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "aggExpr" : "(sum(data.b) + CAST(2 AS BIGINT)) AS `(sum(b) + 2)`" + } +} -- !query @@ -349,7 +384,14 @@ select a, b, count(1) from data group by a, -1 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position -1 is not in select list (valid range is [1, 3]); line 1 pos 44 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "-1", + "size" : "3" + } +} -- !query @@ -358,7 +400,14 @@ select a, b, count(1) from data group by a, 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got count(1) AS `count(1)`; line 1 pos 44 +{ + "errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "aggExpr" : "count(1) AS `count(1)`" + } +} -- !query @@ -367,7 +416,14 @@ select a, b, count(1) from data group by cube(-1, 2) struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position -1 is not in select list (valid range is [1, 3]); line 1 pos 46 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "-1", + "size" : "3" + } +} -- !query @@ -376,7 +432,14 @@ select a, b, count(1) from data group by cube(1, 3) struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got count(1) AS `count(1)`; line 1 pos 49 +{ + "errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "aggExpr" : "count(1) AS `count(1)`" + } +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out index d39f6101ac..cd5bc39d7c 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out @@ -207,7 +207,14 @@ SELECT c, count(*) FROM test_missing_target GROUP BY 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "size" : "2" + } +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out index 7628007487..db2a855bf0 100755 --- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out @@ -210,7 +210,14 @@ SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3 struct<> -- !query output org.apache.spark.sql.AnalysisException -GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63 +{ + "errorClass" : "GROUP_BY_POS_OUT_OF_RANGE", + "sqlState" : "42000", + "messageParameters" : { + "index" : "3", + "size" : "2" + } +} -- !query