Skip to content

Commit

Permalink
#127 spark 3.2.1 support (#129)
Browse files Browse the repository at this point in the history
* #127 spark 3.2.1 support
* #127 minidfs cluster dependencies removal
  • Loading branch information
dk1844 authored Apr 6, 2022
1 parent 9ced8b7 commit df36eb0
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 83 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-sbt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ jobs:
fail-fast: false
matrix:
scala: [ 2.11.12, 2.12.15 ]
spark: [ 2.4.8, 3.1.2 ]
spark: [ 2.4.8, 3.2.1 ]
exclude:
- scala: 2.11.12
spark: 3.1.2
spark: 3.2.1
- scala: 2.12.15
spark: 2.4.8
name: SBT Spark ${{matrix.spark}} on Scala ${{matrix.scala}}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
fail-fast: false
matrix:
scala: [2.11, 2.12]
spark: [2.4, 3.1]
spark: [2.4, 3.2]
exclude:
- scala: 2.11
spark: 3.1
spark: 3.2
name: Test with Spark ${{ matrix.spark }} on Scala ${{ matrix.scala }}
steps:
- name: Checkout code
Expand Down
7 changes: 0 additions & 7 deletions atum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,35 @@
package za.co.absa.atum.core

import java.io.{PrintWriter, StringWriter}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, SaveIntoDataSourceCommand}
import org.apache.spark.sql.util.QueryExecutionListener
import za.co.absa.atum.utils.ExecutionPlanUtils.log
import za.co.absa.atum.utils.{ExecutionPlanUtils, InfoFile}

/**
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
*/
class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecutionListener {
private val log = LogManager.getLogger("SparkQueryExecutionListener")

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
if (funcName == "save") {
(funcName, qe.analyzed) match {
case ("save", _) => writeInfoFileCommon(qe) // < spark 3.1.x
case ("command", saveCommand)
if saveCommand.isInstanceOf[SaveIntoDataSourceCommand] || saveCommand.isInstanceOf[InsertIntoHadoopFsRelationCommand] // spark 3.2+
=> writeInfoFileCommon(qe)
case _ =>

// explanation: https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-31-to-32
// "In Spark 3.2, the query executions triggered by DataFrameWriter are always named command when being sent
// to QueryExecutionListener. In Spark 3.1 and earlier, the name is one of save, insertInto, saveAsTable."
}

def writeInfoFileCommon(qe: QueryExecution) = {
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to Hadoop FS")
writeInfoFileForQuery(qe)

Expand Down
42 changes: 42 additions & 0 deletions atum/src/main/scala/za/co/absa/atum/utils/OperatingSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.utils


object OperatingSystem {

// adapted from https://stackoverflow.com/a/31547504/1773349

object OperatingSystems extends Enumeration {
val WINDOWS, LINUX, MAC, SOLARIS, OTHER = Value
}

def getOsByOsName(osName: String): OperatingSystems.Value = {
import za.co.absa.atum.utils.OperatingSystem.OperatingSystems._
osName.toLowerCase match {
case os if os.contains("win") => WINDOWS
case os if os.contains("nix") || os.contains("nux") || os.contains("aix") => LINUX
case os if os.contains("mac") => MAC
case os if os.contains("sunos") => SOLARIS
case _ => OTHER
}
}

def getCurrentOs: OperatingSystems.Value = {
getOsByOsName(System.getProperty("os.name"))
}

}
27 changes: 15 additions & 12 deletions atum/src/test/scala/za/co/absa/atum/utils/HdfsFileUtilsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
package za.co.absa.atum.utils

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.utils.OperatingSystem.OperatingSystems

class HdfsFileUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase with MiniDfsClusterBase {
class HdfsFileUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase {

override def getConfiguration: Configuration = {
val cfg = new Configuration()
cfg.set("fs.permissions.umask-mode", "000")
cfg
}
implicit val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

private val Content = "Testing Content"

"HdfsFileUtils" should "write a file to HDFS (default permissions)" in {
assume(OperatingSystem.getCurrentOs != OperatingSystems.WINDOWS)
val path = new Path("/tmp/hdfs-file-utils-test/def-perms.file")

HdfsFileUtils.getInfoFilePermissionsFromConfig() shouldBe None // key not present, testing default =>
Expand All @@ -43,20 +40,24 @@ class HdfsFileUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase wit
fs.deleteOnExit(path)
}

it should "write a file to HDFS (max permissions)" in {
it should "write a file to HDFS (max permissions 777 - default umask 022 -> 755)" in {
assume(OperatingSystem.getCurrentOs != OperatingSystems.WINDOWS)

val path = new Path("/tmp/hdfs-file-utils-test/max-perms.file")

val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("777"))
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("755"))
HdfsFileUtils.saveStringDataToFile(path, Content, HdfsFileUtils.getInfoFilePermissionsFromConfig(customConfig).get)

fs.exists(path) shouldBe true
// For this to work, we have miniDfsCluster with umask=000. Default 022 umask would allow max fsPermissions 755
fs.getFileStatus(path).getPermission shouldBe new FsPermission("777")
// Default 022 umask allows max fsPermissions 755
fs.getFileStatus(path).getPermission shouldBe new FsPermission("755")
fs.deleteOnExit(path)
}

it should "write a file to HDFS (min permissions)" in {
assume(OperatingSystem.getCurrentOs != OperatingSystems.WINDOWS)

val path = new Path("/tmp/hdfs-file-utils-test/min-perms.file")
val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("000"))
Expand All @@ -68,6 +69,8 @@ class HdfsFileUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase wit
}

it should "write a file to HDFS (custom permissions)" in {
assume(OperatingSystem.getCurrentOs != OperatingSystems.WINDOWS)

val path = new Path("/tmp/hdfs-file-utils-test/custom-perms.file")
val customConfig = ConfigFactory.empty()
.withValue("atum.hdfs.info.file.permissions", ConfigValueFactory.fromAnyRef("751"))
Expand Down
32 changes: 0 additions & 32 deletions atum/src/test/scala/za/co/absa/atum/utils/MiniDfsClusterBase.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.utils

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.utils.OperatingSystem.OperatingSystems

class OperatingSystemSuite extends AnyFlatSpec with Matchers {

"OperatingSystem util" should "correctly find out OS" in {
OperatingSystem.getOsByOsName("Windows 10") shouldBe OperatingSystems.WINDOWS
OperatingSystem.getOsByOsName("Linux") shouldBe OperatingSystems.LINUX
OperatingSystem.getOsByOsName("Mac OS X") shouldBe OperatingSystems.MAC
OperatingSystem.getOsByOsName("SunOs") shouldBe OperatingSystems.SOLARIS

OperatingSystem.getOsByOsName("my own special os") shouldBe OperatingSystems.OTHER
}
}
2 changes: 1 addition & 1 deletion build-all.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ CALL :cross_build 2.11 2.4
IF %ERRORLEVEL% NEQ 0 GOTO end
CALL :cross_build 2.12 2.4
IF %ERRORLEVEL% NEQ 0 GOTO end
CALL :cross_build 2.12 3.1
CALL :cross_build 2.12 3.2
IF %ERRORLEVEL% NEQ 0 GOTO end

ECHO ===============================================================================
Expand Down
2 changes: 1 addition & 1 deletion build-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ find "$BASE_DIR" -name target -type d -exec rm -rf {} \;

cross_build 2.11 2.4
cross_build 2.12 2.4
cross_build 2.12 3.1
cross_build 2.12 3.2

echo "==============================================================================="
echo Restoring version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object SampleMeasurements1 extends Eventually {
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage1_job_results")

eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
eventually(timeout(scaled(20.seconds)), interval(scaled(500.millis))) {
if (!fs.exists(new Path("data/output/stage1_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage1_job_results")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
df.write.mode(SaveMode.Overwrite)
.parquet(outputPath)

eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
eventually(timeout(scaled(20.seconds)), interval(scaled(500.millis))) {
if (!Files.exists(Paths.get(outputPath))) {
throw new Exception("_INFO file not found at " + outputPath)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ class HdfsInfoIntegrationSuite extends AnyFlatSpec with SparkTestBase with Match
expectedPaths.foreach { expectedPath =>
log.info(s"Checking $expectedPath to contain expected values")

val infoControlMeasures = eventually(timeout(scaled(10.seconds)), interval(scaled(2.seconds))) {
val infoControlMeasures = eventually(timeout(scaled(20.seconds)), interval(scaled(2.seconds))) {
log.info(s"Reading $expectedPath")
val infoContentJson = LocalFsTestUtils.readFileAsString(expectedPath)
ControlMeasuresParser.fromJson(infoContentJson)
Expand Down
9 changes: 4 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,10 @@
<mockito.scala.version>1.15.0</mockito.scala.version>
<commons.version>0.0.27</commons.version>
<typesafe.config.version>1.4.1</typesafe.config.version>
<hadoop.version>2.8.5</hadoop.version>

<!-- Spark versions -->
<spark-24.version>2.4.8</spark-24.version>
<spark-31.version>3.1.2</spark-31.version>
<spark-32.version>3.2.1</spark-32.version>

<spark.version>${spark-24.version}</spark.version>

Expand Down Expand Up @@ -342,10 +341,10 @@
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<id>spark-3.2</id>
<properties>
<spark.version>${spark-31.version}</spark.version>
<json4s.version>3.7.0-M5</json4s.version>
<spark.version>${spark-32.version}</spark.version>
<json4s.version>3.7.0-M11</json4s.version>
</properties>
</profile>

Expand Down
19 changes: 3 additions & 16 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ object Dependencies {

object Versions {
val spark2 = "2.4.8"
val spark3 = "3.1.2"
val spark3 = "3.2.1"

val json4s_spark2 = "3.5.3"
val json4s_spark3 = "3.7.0-M5"

val hadoop2 = "2.8.5"
val hadoop3 = "3.2.2"
val json4s_spark3 = "3.7.0-M11"

val absaCommons = "0.0.27"
val typesafeConfig = "1.4.1"
Expand All @@ -34,8 +31,7 @@ object Dependencies {
val specs2 = "2.5"
val aws = "2.17.85"

val jUnit = "4.11" // for hdfs minicluster
val apacheCommonsLang3 = "3.5"
val apacheCommonsLang3 = "3.12.0"
val commonsConfiguration = "1.6"
}

Expand Down Expand Up @@ -79,11 +75,6 @@ object Dependencies {
lazy val mockitoScala = "org.mockito" %% "mockito-scala" % Versions.mockitoScala % Test
lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test

val hadoopMinicluster = moduleByScala("org.apache.hadoop" % "hadoop-minicluster" % _ % Test)(Versions.hadoop2, Versions.hadoop3) _
val hadoopHdfs = moduleByScala("org.apache.hadoop" % "hadoop-hdfs" % _ % Test classifier "tests")(Versions.hadoop2, Versions.hadoop3) _
val hadoopComons = moduleByScala("org.apache.hadoop" % "hadoop-common" % _ % Test classifier "tests")(Versions.hadoop2, Versions.hadoop3) _
lazy val jUnit = "junit" % "junit" % Versions.jUnit % Test

lazy val scalaTestProvided = "org.scalatest" %% "scalatest" % Versions.scalatest % Provided
lazy val specs2core = "org.specs2" %% "specs2-core" % Versions.specs2 % Test

Expand All @@ -110,10 +101,6 @@ object Dependencies {

mockitoScala,
mockitoScalaScalatest,
hadoopMinicluster(scalaVersion),
hadoopHdfs(scalaVersion),
hadoopComons(scalaVersion),
jUnit
)

lazy val examplesDependencies: Seq[ModuleID] = Seq(
Expand Down

0 comments on commit df36eb0

Please sign in to comment.