Skip to content

Commit

Permalink
[SPARK-40256][BUILD][K8S] Switch base image from openjdk to eclipse-t…
Browse files Browse the repository at this point in the history
…emurin

### What changes were proposed in this pull request?
This PR switchs the base image from [`openjdk`](https://hub.docker.com/_/openjdk) to [`eclipse-temurin`](https://hub.docker.com/_/eclipse-temurin) (original openjdk).

The core change is: the OS of base image changes `debian-bullseye` to `ubuntu-focal` (based on debian bullseye).

### Why are the changes needed?

- According to docker-library/openjdk#505 and docker-library/docs#2162, openjdk:8/11 image is EOL and Eclipse Temurin replaces this, the original openjdk image will `remove the 11 and 8 tags (in October 2022, perhaps)` (we are using it in spark), so we have to switch this before it happens.

- The `openjdk` is [not update anymore](https://adoptopenjdk.net/upstream.html) (the last releases were 8u342 and 11.0.16, Eclipse Temurin replace is recommanded by adoptopenjdk) that means even the 8/11 tag is not removed, we still need to switch `openjdk`.

- There were [many docker official image](https://github.com/search?q=org%3Adocker-library+temurin&type=code) already switch openjdk to eclipse-temurin.

- According the [jvm ecosystem report](https://snyk.io/jvm-ecosystem-report-2021) from docker-library/docs#2162 , AdoptOpenJDK(now donation to eclipse foundation and rename to eclipse temurin) builds of OpenJDK most popular in production.

- An ideal long-term solution is that we only choose the jdk version and leave the adaptation of OS to the corresponding openjdk official image (just like eclipse-temurin are suppoort [ubuntu, alpine, centos](https://github.com/adoptium/containers/tree/main/11/jre))

- The alternate solution is we just swith `openjdk` image to `debian-bullseye` with openjdk 11 installation. like: Yikun/spark#163. But it makes spark image **depends on debian OS more**, that means we will diffcult to support the Java version which debian OS doesn't support (such as openjdk-8-jre is not be supported in current debian anymore).

For the above reason, I think `eclipse-temurin` is a good choice.

### Does this PR introduce _any_ user-facing change?

Yes, the docker images base image changes.

### How was this patch tested?
CI passed, I also have a local test on: Yikun/spark#162

Closes #37705 from Yikun/switch-temurin.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
a0x8o committed Dec 30, 2022
1 parent 278a7e2 commit 0d1352d
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 44 deletions.
4 changes: 2 additions & 2 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ Examples:
$0 -r docker.io/myrepo -t v2.3.0 push
- Build and push Java11-based image with tag "v3.0.0" to docker.io/myrepo
$0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-slim build
$0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-focal build
$0 -r docker.io/myrepo -t v3.0.0 push
- Build and push Java11-based image for multiple archs to docker.io/myrepo
$0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-slim build
$0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-focal build
# Note: buildx, which does cross building, needs to do the push during build
# So there is no separate push step with -X
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@
"Grouping sets size cannot be greater than <maxSize>"
]
},
"GROUP_BY_POS_OUT_OF_RANGE" : {
"message" : [
"GROUP BY position <index> is not in select list (valid range is [1, <size>])."
],
"sqlState" : "42000"
},
"GROUP_BY_POS_REFERS_AGG_EXPR" : {
"message" : [
"GROUP BY <index> refers to an expression <aggExpr> that contains an aggregate function. Aggregate functions are not allowed in GROUP BY."
],
"sqlState" : "42000"
},
"INCOMPARABLE_PIVOT_COLUMN" : {
"message" : [
"Invalid pivot column <columnName>. Pivot columns must be comparable."
Expand Down
11 changes: 9 additions & 2 deletions dev/.scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ align.tokens = []
optIn = {
configStyleArguments = false
}
danglingParentheses = false
docstrings = JavaDoc
danglingParentheses.preset = false
docstrings.style = Asterisk
maxColumn = 98
runner.dialect = scala212
fileOverride {
"glob:**/src/**/scala-2.13/**.scala" {
runner.dialect = scala213
}
}
version = 3.5.9
2 changes: 1 addition & 1 deletion dev/scalafmt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
#

VERSION="${@:-2.12}"
./build/mvn -Pscala-$VERSION mvn-scalafmt_$VERSION:format -Dscalafmt.skip=false
./build/mvn -Pscala-$VERSION scalafmt:format -Dscalafmt.skip=false

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3404,7 +3404,7 @@
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
<version>1.0.4</version>
<version>1.1.1640084764.9f463a9</version>
<configuration>
<validateOnly>${scalafmt.skip}</validateOnly> <!-- (Optional) skip formatting -->
<skipSources>${scalafmt.skip}</skipSources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
ARG java_image_tag=11-jre-slim
ARG java_image_tag=11-jre-focal

FROM openjdk:${java_image_tag}
FROM eclipse-temurin:${java_image_tag}

ARG spark_uid=185

Expand All @@ -28,7 +28,6 @@ ARG spark_uid=185
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps net-tools && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ then
cd $SPARK_INPUT_DIR

if [[ $DOCKER_FILE == "N/A" ]]; then
# OpenJDK base-image tag (e.g. 8-jre-slim, 11-jre-slim)
# OpenJDK base-image tag (e.g. 8-jre-focal, 11-jre-focal)
JAVA_IMAGE_TAG_BUILD_ARG="-b java_image_tag=$JAVA_IMAGE_TAG"
else
if [[ $DOCKER_FILE = /* ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class AnalysisException protected[sql] (
line = origin.line,
startPosition = origin.startPosition,
errorClass = Some(errorClass),
errorSubClass = Some(errorSubClass),
errorSubClass = Option(errorSubClass),
messageParameters = messageParameters)

def copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
def groupByPositionRefersToAggregateFunctionError(
index: Int,
expr: Expression): Throwable = {
new AnalysisException(s"GROUP BY $index refers to an expression that is or contains " +
"an aggregate function. Aggregate functions are not allowed in GROUP BY, " +
s"but got ${expr.sql}")
new AnalysisException(
errorClass = "GROUP_BY_POS_REFERS_AGG_EXPR",
messageParameters = Array(index.toString, expr.sql))
}

def groupByPositionRangeError(index: Int, size: Int): Throwable = {
new AnalysisException(s"GROUP BY position $index is not in select list " +
s"(valid range is [1, $size])")
new AnalysisException(
errorClass = "GROUP_BY_POS_OUT_OF_RANGE",
messageParameters = Array(index.toString, size.toString))
}

def generatorNotExpectedError(name: FunctionIdentifier, classCanonicalName: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
Expand All @@ -46,7 +45,13 @@ case class ParquetScanBuilder(
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

lazy val pushedParquetFilters = {
private var finalSchema = new StructType()

private var pushedAggregations = Option.empty[Aggregation]

override protected val supportsNestedSchemaPruning: Boolean = true

override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = {
val sqlConf = sparkSession.sessionState.conf
if (sqlConf.parquetFilterPushDown) {
val pushDownDate = sqlConf.parquetFilterPushDownDate
Expand All @@ -68,25 +73,12 @@ case class ParquetScanBuilder(
// The rebase mode doesn't matter here because the filters are used to determine
// whether they is convertible.
RebaseSpec(LegacyBehaviorPolicy.CORRECTED))
parquetFilters.convertibleFilters(pushedDataFilters).toArray
parquetFilters.convertibleFilters(dataFilters).toArray
} else {
Array.empty[Filter]
}
}

private var finalSchema = new StructType()

private var pushedAggregations = Option.empty[Aggregation]

override protected val supportsNestedSchemaPruning: Boolean = true

override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = dataFilters

// Note: for Parquet, the actual filter push down happens in [[ParquetPartitionReaderFactory]].
// It requires the Parquet physical schema to determine whether a filter is convertible.
// All filters that can be converted to Parquet are pushed down.
override def pushedFilters: Array[Predicate] = pushedParquetFilters.map(_.toV2)

override def pushAggregation(aggregation: Aggregation): Boolean = {
if (!sparkSession.sessionState.conf.parquetAggregatePushDown) {
return false
Expand Down Expand Up @@ -114,7 +106,7 @@ case class ParquetScanBuilder(
finalSchema = readDataSchema()
}
ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, finalSchema,
readPartitionSchema(), pushedParquetFilters, options, pushedAggregations,
readPartitionSchema(), pushedDataFilters, options, pushedAggregations,
partitionFilters, dataFilters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ select a, b from data group by -1
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position -1 is not in select list (valid range is [1, 2]); line 1 pos 31
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "-1",
"size" : "2"
}
}


-- !query
Expand All @@ -101,7 +108,14 @@ select a, b from data group by 0
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position 0 is not in select list (valid range is [1, 2]); line 1 pos 31
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "0",
"size" : "2"
}
}


-- !query
Expand All @@ -110,7 +124,14 @@ select a, b from data group by 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 31
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"size" : "2"
}
}


-- !query
Expand All @@ -119,7 +140,14 @@ select a, b, sum(b) from data group by 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got sum(data.b) AS `sum(b)`; line 1 pos 39
{
"errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"aggExpr" : "sum(data.b) AS `sum(b)`"
}
}


-- !query
Expand All @@ -128,7 +156,14 @@ select a, b, sum(b) + 2 from data group by 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got (sum(data.b) + CAST(2 AS BIGINT)) AS `(sum(b) + 2)`; line 1 pos 43
{
"errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"aggExpr" : "(sum(data.b) + CAST(2 AS BIGINT)) AS `(sum(b) + 2)`"
}
}


-- !query
Expand Down Expand Up @@ -349,7 +384,14 @@ select a, b, count(1) from data group by a, -1
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position -1 is not in select list (valid range is [1, 3]); line 1 pos 44
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "-1",
"size" : "3"
}
}


-- !query
Expand All @@ -358,7 +400,14 @@ select a, b, count(1) from data group by a, 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got count(1) AS `count(1)`; line 1 pos 44
{
"errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"aggExpr" : "count(1) AS `count(1)`"
}
}


-- !query
Expand All @@ -367,7 +416,14 @@ select a, b, count(1) from data group by cube(-1, 2)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position -1 is not in select list (valid range is [1, 3]); line 1 pos 46
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "-1",
"size" : "3"
}
}


-- !query
Expand All @@ -376,7 +432,14 @@ select a, b, count(1) from data group by cube(1, 3)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY 3 refers to an expression that is or contains an aggregate function. Aggregate functions are not allowed in GROUP BY, but got count(1) AS `count(1)`; line 1 pos 49
{
"errorClass" : "GROUP_BY_POS_REFERS_AGG_EXPR",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"aggExpr" : "count(1) AS `count(1)`"
}
}


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,14 @@ SELECT c, count(*) FROM test_missing_target GROUP BY 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 53
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"size" : "2"
}
}


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,14 @@ SELECT udf(c), udf(count(*)) FROM test_missing_target GROUP BY 3
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
GROUP BY position 3 is not in select list (valid range is [1, 2]); line 1 pos 63
{
"errorClass" : "GROUP_BY_POS_OUT_OF_RANGE",
"sqlState" : "42000",
"messageParameters" : {
"index" : "3",
"size" : "2"
}
}


-- !query
Expand Down

0 comments on commit 0d1352d

Please sign in to comment.