From 56c76bc2d4563593edce062a563603fe63e5a431 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 27 Sep 2018 10:18:38 -0700 Subject: [PATCH] [LIVY-511][LIVY-512] Remove support for old Spark, Scala versions. This change restricts Livy support to Spark 2.2+ and Scala 2.11. Both changes are made together because by supporting Spark 2.2+ only, it becomes impossible to test Scala 2.10. As part of the change, a lot of code that used reflection to support different versions of Spark could be cleaned up and directly call Spark APIs. The Scala 2.10 parts of the builds also have been removed, but the actual support for building and running with different Scala versions (and related tests) have been left as is. This will allow us to support 2.12 in the future. This change intentionally does not touch the public API (the "api/" module). There are things that could be cleaned up now that Spark 1.x is not supported, but that would mean an API breakage so I chose to leave those alone for now. The test matrix and build profiles have also been simplified a lot. There are now two profiles to choose from (for Spark 2.2 and 2.3); integration tests can be run against a different version of Spark by running just the integration test module with the desired profile. Tested with Spark 2.2 and 2.3, and also by building against 2.2 and running integration tests against 2.3. Author: Marcelo Vanzin Closes #112 from vanzin/LIVY-511. --- .travis.yml | 24 +-- README.md | 19 +- assembly/assembly.xml | 7 - assembly/pom.xml | 6 - conf/livy.conf.template | 4 +- core/scala-2.10/pom.xml | 53 ----- coverage/pom.xml | 18 -- .../java/org/apache/livy/examples/PiApp.java | 2 +- .../org/apache/livy/test/InteractiveIT.scala | 9 +- pom.xml | 147 +------------- repl/scala-2.10/pom.xml | 41 ---- .../apache/livy/repl/SparkInterpreter.scala | 182 ------------------ .../livy/repl/SparkInterpreterSpec.scala | 86 --------- .../apache/livy/repl/SparkInterpreter.scala | 55 +----- .../org/apache/livy/repl/SQLInterpreter.scala | 25 +-- .../apache/livy/repl/SQLInterpreterSpec.scala | 26 +-- .../org/apache/livy/rsc/ContextLauncher.java | 9 +- .../livy/rsc/driver/JobContextImpl.java | 12 +- .../apache/livy/rsc/driver/SparkEntries.java | 46 ++--- scala-api/scala-2.10/pom.xml | 38 ---- scala-api/src/main/resources/build.marker | 0 .../main/scala/org/apache/livy/LivyConf.scala | 3 +- .../livy/server/batch/BatchSession.scala | 3 - .../interactive/InteractiveSession.scala | 21 +- .../apache/livy/utils/LivySparkUtils.scala | 10 +- .../BaseInteractiveServletSpec.scala | 4 +- .../interactive/InteractiveSessionSpec.scala | 8 +- .../livy/utils/LivySparkUtilsSuite.scala | 52 +---- .../livy/thriftserver/rpc/RpcClient.scala | 76 +++----- 29 files changed, 113 insertions(+), 873 deletions(-) delete mode 100644 core/scala-2.10/pom.xml delete mode 100644 repl/scala-2.10/pom.xml delete mode 100644 repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala delete mode 100644 repl/scala-2.10/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala delete mode 100644 scala-api/scala-2.10/pom.xml delete mode 100644 scala-api/src/main/resources/build.marker diff --git a/.travis.yml b/.travis.yml index d38cf4f42..a72a8751c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,24 +19,16 @@ sudo: required dist: trusty language: scala -env: - matrix: - - MVN_FLAG='-DskipTests' - - MVN_FLAG='-Pspark-2.0-it -DskipTests' - - MVN_FLAG='-Pspark-2.1-it -DskipTests' - - MVN_FLAG='-Pspark-1.6 -DskipITs' - - MVN_FLAG='-Pspark-2.0 -DskipITs' - - MVN_FLAG='-Pspark-2.1 -DskipITs' - matrix: include: - # Spark 2.2+ will only be verified using JDK8 - # Thriftserver requires JDK8 - - env: MVN_FLAG='-Pthriftserver -Pspark-2.2-it -DskipTests' - - env: MVN_FLAG='-Pthriftserver -Pspark-2.2 -DskipITs' - - env: MVN_FLAG='-Pthriftserver -Pspark-2.3-it -DskipTests' - - env: MVN_FLAG='-Pthriftserver -Pspark-2.3 -DskipITs' - + - name: "Spark 2.2 Unit Tests" + env: MVN_FLAG='-Pthriftserver -DskipITs' + - name: "Spark 2.2 ITs" + env: MVN_FLAG='-Pthriftserver -DskipTests' + - name: "Spark 2.3 Unit Tests" + env: MVN_FLAG='-Pspark-2.3 -Pthriftserver -DskipITs' + - name: "Spark 2.3 ITs" + env: MVN_FLAG='-Pspark-2.3 -Pthriftserver -DskipTests' jdk: - oraclejdk8 diff --git a/README.md b/README.md index ff6ae163d..9d7df7093 100644 --- a/README.md +++ b/README.md @@ -57,12 +57,8 @@ Required python packages for building Livy: To run Livy, you will also need a Spark installation. You can get Spark releases at https://spark.apache.org/downloads.html. -Livy requires at least Spark 1.6 and supports both Scala 2.10 and 2.11 builds of Spark, Livy -will automatically pick repl dependencies through detecting the Scala version of Spark. - -Livy also supports Spark 2.0+ for both interactive and batch submission, you could seamlessly -switch to different versions of Spark through ``SPARK_HOME`` configuration, without needing to -rebuild Livy. +Livy requires Spark 2.2 or 2.3. You can switch to a different version of Spark by setting the +``SPARK_HOME`` environment variable in the Livy server process, without needing to rebuild Livy. ## Building Livy @@ -75,8 +71,9 @@ cd livy mvn package ``` -By default Livy is built against Apache Spark 1.6.2, but the version of Spark used when running -Livy does not need to match the version used to build Livy. Livy internally uses reflection to -mitigate the gaps between different Spark versions, also Livy package itself does not -contain a Spark distribution, so it will work with any supported version of Spark (Spark 1.6+) -without needing to rebuild against specific version of Spark. +By default Livy is built against Apache Spark 2.2.0, but the version of Spark used when running +Livy does not need to match the version used to build Livy. Livy internally handles the differences +between different Spark versions. + +The Livy package itself does not contain a Spark distribution. It will work with any supported +version of Spark without needing to rebuild. diff --git a/assembly/assembly.xml b/assembly/assembly.xml index f63fc0b25..b4aa49c2b 100644 --- a/assembly/assembly.xml +++ b/assembly/assembly.xml @@ -54,13 +54,6 @@ * - - ${project.parent.basedir}/repl/scala-2.10/target/jars - ${assembly.name}/repl_2.10-jars - - * - - ${project.parent.basedir}/repl/scala-2.11/target/jars ${assembly.name}/repl_2.11-jars diff --git a/assembly/pom.xml b/assembly/pom.xml index 470f24d60..fddef025a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -41,12 +41,6 @@ ${project.version} - - ${project.groupId} - livy-repl_2.10 - ${project.version} - - ${project.groupId} livy-repl_2.11 diff --git a/conf/livy.conf.template b/conf/livy.conf.template index fede70aa4..707e9c931 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -71,8 +71,8 @@ # Comma-separated list of Livy REPL jars. By default Livy will upload jars from its installation # directory every time a session is started. By caching these files in HDFS, for example, startup # time of sessions on YARN can be reduced. Please list all the repl dependencies including -# livy-repl_2.10 and livy-repl_2.11 jars, Livy will automatically pick the right dependencies in -# session creation. +# Scala version-specific livy-repl jars, Livy will automatically pick the right dependencies +# during session creation. # livy.repl.jars = # Location of PySpark archives. By default Livy will upload the file from SPARK_HOME, but diff --git a/core/scala-2.10/pom.xml b/core/scala-2.10/pom.xml deleted file mode 100644 index d5ee46deb..000000000 --- a/core/scala-2.10/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-core_2.10 - 0.6.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-core-parent - 0.6.0-incubating-SNAPSHOT - ../pom.xml - - - - ${scala-2.10.version} - 2.10 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - - diff --git a/coverage/pom.xml b/coverage/pom.xml index 26501bbfa..931524fbb 100644 --- a/coverage/pom.xml +++ b/coverage/pom.xml @@ -50,24 +50,12 @@ ${project.version} - - ${project.groupId} - livy-core_2.10 - ${project.version} - - ${project.groupId} livy-core_2.11 ${project.version} - - ${project.groupId} - livy-repl_2.10 - ${project.version} - - ${project.groupId} livy-repl_2.11 @@ -86,12 +74,6 @@ ${project.version} - - ${project.groupId} - livy-scala-api_2.10 - ${project.version} - - ${project.groupId} livy-scala-api_2.11 diff --git a/examples/src/main/java/org/apache/livy/examples/PiApp.java b/examples/src/main/java/org/apache/livy/examples/PiApp.java index 638f3b209..b7b65974b 100644 --- a/examples/src/main/java/org/apache/livy/examples/PiApp.java +++ b/examples/src/main/java/org/apache/livy/examples/PiApp.java @@ -65,7 +65,7 @@ public Integer call(Integer v1, Integer v2) { /** * Example execution: - * java -cp /pathTo/spark-core_2.10-*version*.jar:/pathTo/livy-api-*version*.jar: + * java -cp /pathTo/spark-core_2.11-*version*.jar:/pathTo/livy-api-*version*.jar: * /pathTo/livy-client-http-*version*.jar:/pathTo/livy-examples-*version*.jar * org.apache.livy.examples.PiApp http://livy-host:8998 2 */ diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala index 853290a78..689195cb9 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala @@ -44,15 +44,8 @@ class InteractiveIT extends BaseIntegrationTestSuite { s.run("throw new IllegalStateException()") .verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*") - // Check if we're running with Spark1 or Spark2, in Spark1 we will use SQLContext, whereas - // for Spark2 we will use SparkSession. - val entry = if (s.run("spark").result().isLeft) { - "spark" - } else { - "sqlContext" - } // Verify query submission - s.run(s"""val df = $entry.createDataFrame(Seq(("jerry", 20), ("michael", 21)))""") + s.run(s"""val df = spark.createDataFrame(Seq(("jerry", 20), ("michael", 21)))""") .verifyResult(".*" + Pattern.quote("df: org.apache.spark.sql.DataFrame") + ".*") s.run("df.registerTempTable(\"people\")").result() s.run("SELECT * FROM people", Some(SQL)).verifyResult(".*\"jerry\",20.*\"michael\",21.*") diff --git a/pom.xml b/pom.xml index b49c52449..02c71cb4b 100644 --- a/pom.xml +++ b/pom.xml @@ -80,8 +80,7 @@ 2.7.3 compile - 1.6.2 - 1.6.2 + 2.2.0 ${spark.scala-2.11.version} 3.0.0 1.9 @@ -91,18 +90,16 @@ 2.9.5 3.1.0 9.3.8.v20160314 - 3.2.10 + 3.2.11 4.11 0.9.3 2.22 3.1.0 1.9.5 4.0.37.Final - 4.0.37.Final ${netty.spark-2.11.version} UTF-8 - 0.9 - 2.10.4 + 0.10.7 2.11.12 2.11 ${scala-2.11.version} @@ -113,9 +110,9 @@ ${user.dir} ${execution.root}/dev/spark - https://d3kbcqa49mib13.cloudfront.net/spark-1.6.2-bin-hadoop2.6.tgz + https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz - spark-1.6.2-bin-hadoop2.6 + spark-2.2.0-bin-hadoop2.7 ${basedir}/target @@ -195,18 +192,15 @@ client-common client-http core - core/scala-2.10 core/scala-2.11 coverage examples python-api repl - repl/scala-2.10 repl/scala-2.11 rsc scala scala-api - scala-api/scala-2.10 scala-api/scala-2.11 server test-lib @@ -705,6 +699,8 @@ true ${spark.home} ${livy.log.dir} + ${spark.version} + ${scala.binary.version} true @@ -1031,141 +1027,12 @@ - - spark-1.6 - - - !spark-2.0 - - - - 1.6.2 - - - - - spark-2.0 - - - spark-2.0 - - - - 2.0.1 - 2.0.1 - ${spark.scala-2.11.version} - 0.10.3 - 3.2.11 - - https://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz - - spark-2.0.1-bin-hadoop2.7 - - - - - spark-2.0-it - - - spark-2.0-it - - - - - https://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz - - spark-2.0.1-bin-hadoop2.7 - - - - - spark-2.1 - - - spark-2.1 - - - - 2.1.0 - 2.1.0 - ${spark.scala-2.11.version} - 0.10.4 - 3.2.11 - - - - - spark-2.1-it - - - spark-2.1-it - - - - - https://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz - - spark-2.1.0-bin-hadoop2.7 - - - - - spark-2.2 - - - spark-2.2 - - - - 2.2.0 - 2.2.0 - ${spark.scala-2.11.version} - 0.10.4 - 3.2.11 - - - - - spark-2.2-it - - - spark-2.2-it - - - - - https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz - - spark-2.2.0-bin-hadoop2.7 - - - spark-2.3 - - - spark-2.3 - - 2.3.1 - 2.2.0 ${spark.scala-2.11.version} 4.1.17.Final - 4.0.37.Final - 0.10.7 - 3.2.11 - - - - - spark-2.3-it - - - spark-2.3-it - - - http://mirrors.advancedhosters.com/apache/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz diff --git a/repl/scala-2.10/pom.xml b/repl/scala-2.10/pom.xml deleted file mode 100644 index fc55b614e..000000000 --- a/repl/scala-2.10/pom.xml +++ /dev/null @@ -1,41 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-repl_2.10 - 0.6.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-repl-parent - 0.6.0-incubating-SNAPSHOT - ../pom.xml - - - - ${scala-2.10.version} - 2.10 - ${spark.scala-2.10.version} - ${netty.spark-2.10.version} - - - diff --git a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala deleted file mode 100644 index e86d47d7b..000000000 --- a/repl/scala-2.10/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.repl - -import java.io._ -import java.net.URLClassLoader -import java.nio.file.Paths - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.JPrintWriter -import scala.tools.nsc.interpreter.Results.Result -import scala.util.{Failure, Success, Try} - -import org.apache.spark.SparkConf -import org.apache.spark.repl.SparkIMain -import org.apache.spark.repl.SparkJLineCompletion - -import org.apache.livy.rsc.driver.SparkEntries - -/** - * This represents a Spark interpreter. It is not thread safe. - */ -class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter { - - private var sparkIMain: SparkIMain = _ - - override def start(): Unit = { - require(sparkIMain == null) - - val settings = new Settings() - settings.embeddedDefaults(Thread.currentThread().getContextClassLoader()) - settings.usejavacp.value = true - - sparkIMain = new SparkIMain(settings, new JPrintWriter(outputStream, true)) - sparkIMain.initializeSynchronous() - - // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files - // are stored needs to be registered in SparkConf. See comment in - // SparkILoop::createSparkContext(). - Try(sparkIMain.getClass().getMethod("classServerUri")) match { - case Success(method) => - method.setAccessible(true) - conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String]) - - case Failure(_) => - val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory") - outputDir.setAccessible(true) - conf.set("spark.repl.class.outputDir", - outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath()) - } - - restoreContextClassLoader { - // Call sparkIMain.setContextClassLoader() to make sure SparkContext and repl are using the - // same ClassLoader. Otherwise if someone defined a new class in interactive shell, - // SparkContext cannot see them and will result in job stage failure. - val setContextClassLoaderMethod = sparkIMain.getClass().getMethod("setContextClassLoader") - setContextClassLoaderMethod.setAccessible(true) - setContextClassLoaderMethod.invoke(sparkIMain) - - // With usejavacp=true, the Scala interpreter looks for jars under System Classpath. But it - // doesn't look for jars added to MutableURLClassLoader. Thus extra jars are not visible to - // the interpreter. SparkContext can use them via JVM ClassLoaders but users cannot import - // them using Scala import statement. - // - // For instance: If we import a package using SparkConf: - // "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0" - // then "import com.databricks.spark.csv._" in the interpreter, it will throw an error. - // - // Adding them to the interpreter manually to fix this issue. - var classLoader = Thread.currentThread().getContextClassLoader - while (classLoader != null) { - if (classLoader.getClass.getCanonicalName == "org.apache.spark.util.MutableURLClassLoader") - { - val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs() - // Check if the file exists. Otherwise an exception will be thrown. - .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } - // Livy rsc and repl are also in the extra jars list. Filter them out. - .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") } - // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. - .filterNot { u => - Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") - } - - extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") } - sparkIMain.addUrlsToClassPath(extraJarPath: _*) - classLoader = null - } else { - classLoader = classLoader.getParent - } - } - - postStart() - } - } - - override protected def bind(name: String, - tpe: String, - value: Object, - modifier: List[String]): Unit = { - sparkIMain.beQuietDuring { - sparkIMain.bind(name, tpe, value, modifier) - } - } - - override def close(): Unit = synchronized { - super.close() - - if (sparkIMain != null) { - sparkIMain.close() - sparkIMain = null - } - } - - override protected def isStarted(): Boolean = { - sparkIMain != null - } - - override protected def interpret(code: String): Result = { - sparkIMain.interpret(code) - } - - override protected def completeCandidates(code: String, cursor: Int) : Array[String] = { - val completer = new SparkJLineCompletion(sparkIMain) - completer.completer().complete(code, cursor).candidates.toArray - } - - override protected[repl] def parseError(stdout: String): (String, Seq[String]) = { - // An example of Scala 2.10 runtime exception error message: - // java.lang.Exception: message - // at $iwC$$iwC$$iwC$$iwC$$iwC.error(:25) - // at $iwC$$iwC$$iwC.error2(:27) - // at $iwC$$iwC.(:41) - // at $iwC.(:43) - // at (:45) - // at .(:49) - // at .() - // at .(:7) - // at .() - // at $print() - // at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - // at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) - // ... - - val (ename, traceback) = super.parseError(stdout) - - // Remove internal frames. - val startOfInternalFrames = traceback.indexWhere(_.contains("$iwC$$iwC.")) - var endOfInternalFrames = traceback.indexWhere(!_.trim.startsWith("at"), startOfInternalFrames) - if (endOfInternalFrames == -1) { - endOfInternalFrames = traceback.length - } - - val cleanedTraceback = if (startOfInternalFrames == -1) { - traceback - } else { - traceback.view.zipWithIndex - .filterNot { z => z._2 >= startOfInternalFrames && z._2 < endOfInternalFrames } - .map { _._1.replaceAll("(\\$iwC\\$)*\\$iwC", "") } - } - - (ename, cleanedTraceback) - } - - override protected def valueOfTerm(name: String): Option[Any] = { - sparkIMain.valueOfTerm(name) - } -} diff --git a/repl/scala-2.10/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.10/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala deleted file mode 100644 index e3c849dcb..000000000 --- a/repl/scala-2.10/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.repl - -import org.scalatest._ - -import org.apache.livy.LivyBaseUnitTestSuite - -class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { - describe("SparkInterpreter") { - val interpreter = new SparkInterpreter(null) - - it("should parse Scala compile error.") { - // Regression test for LIVY-260. - val error = - """:27: error: type mismatch; - | found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |:27: error: type mismatch; - | found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |""".stripMargin - - val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split( - """ found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |:27: error: type mismatch; - | found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |""".stripMargin) - - val (ename, traceback) = interpreter.parseError(error) - ename shouldBe ":27: error: type mismatch;" - traceback shouldBe expectedTraceback - } - - it("should parse Scala runtime error and remove internal frames.") { - val error = - """java.lang.RuntimeException: message - | at $iwC$$iwC$$iwC$$iwC$$iwC.error(:25) - | at $iwC$$iwC$$iwC.error2(:27) - | at $iwC$$iwC.(:41) - | at $iwC.(:43) - | at (:45) - | at .(:49) - | at .() - | at .(:7) - | at .() - | at $print() - | at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - |""".stripMargin - - val expectedTraceback = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split( - """ at .error(:25) - | at .error2(:27) - |""".stripMargin) - - val (ename, traceback) = interpreter.parseError(error) - ename shouldBe "java.lang.RuntimeException: message" - traceback shouldBe expectedTraceback - } - } -} diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala index 884df5ccb..7de285914 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala @@ -35,12 +35,11 @@ import org.apache.spark.repl.SparkILoop import org.apache.livy.rsc.driver.SparkEntries /** - * Scala 2.11 version of SparkInterpreter + * This represents a Spark interpreter. It is not thread safe. */ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter { private var sparkILoop: SparkILoop = _ - private var sparkHttpServer: Object = _ override def start(): Unit = { require(sparkILoop == null) @@ -50,12 +49,6 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS outputDir.deleteOnExit() conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - // Only Spark1 requires to create http server, Spark2 removes HttpServer class. - startHttpServer(outputDir).foreach { case (server, uri) => - sparkHttpServer = server - conf.set("spark.repl.class.uri", uri) - } - val settings = new Settings() settings.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) @@ -103,13 +96,6 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS sparkILoop.closeInterpreter() sparkILoop = null } - - if (sparkHttpServer != null) { - val method = sparkHttpServer.getClass.getMethod("stop") - method.setAccessible(true) - method.invoke(sparkHttpServer) - sparkHttpServer = null - } } override protected def isStarted(): Boolean = { @@ -146,43 +132,4 @@ class SparkInterpreter(protected override val conf: SparkConf) extends AbstractS sparkILoop.bind(name, tpe, value, modifier) } } - - private def startHttpServer(outputDir: File): Option[(Object, String)] = { - try { - val httpServerClass = Class.forName("org.apache.spark.HttpServer") - val securityManager = { - val constructor = Class.forName("org.apache.spark.SecurityManager") - .getConstructor(classOf[SparkConf]) - constructor.setAccessible(true) - constructor.newInstance(conf).asInstanceOf[Object] - } - val httpServerConstructor = httpServerClass - .getConstructor(classOf[SparkConf], - classOf[File], - Class.forName("org.apache.spark.SecurityManager"), - classOf[Int], - classOf[String]) - httpServerConstructor.setAccessible(true) - // Create Http Server - val port = conf.getInt("spark.replClassServer.port", 0) - val server = httpServerConstructor - .newInstance(conf, outputDir, securityManager, new Integer(port), "HTTP server") - .asInstanceOf[Object] - - // Start Http Server - val startMethod = server.getClass.getMethod("start") - startMethod.setAccessible(true) - startMethod.invoke(server) - - // Get uri of this Http Server - val uriMethod = server.getClass.getMethod("uri") - uriMethod.setAccessible(true) - val uri = uriMethod.invoke(server).asInstanceOf[String] - Some((server, uri)) - } catch { - // Spark 2.0+ removed HttpServer, so return null instead. - case NonFatal(e) => - None - } - } } diff --git a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala index afef09b75..5a7b60638 100644 --- a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -67,7 +68,7 @@ class SQLInterpreter( private implicit def formats = DefaultFormats - private var spark: AnyRef = null + private var spark: SparkSession = null private val maxResult = rscConf.getInt(RSCConf.Entry.SQL_NUM_ROWS) @@ -75,28 +76,16 @@ class SQLInterpreter( override def start(): Unit = { require(!sparkEntries.sc().sc.isStopped) - - val sparkVersion = sparkConf.getInt("spark.livy.spark_major_version", 1) - if (sparkVersion == 1) { - spark = Option(sparkEntries.hivectx()).getOrElse(sparkEntries.sqlctx()) - } else { - spark = sparkEntries.sparkSession() - } + spark = sparkEntries.sparkSession() } override protected[repl] def execute(code: String): Interpreter.ExecuteResponse = { try { - val result = spark.getClass.getMethod("sql", classOf[String]).invoke(spark, code) - - // Get the schema info - val schema = result.getClass.getMethod("schema").invoke(result) - val jsonString = schema.getClass.getMethod("json").invoke(schema).asInstanceOf[String] - val jSchema = parse(jsonString) + val result = spark.sql(code) + val schema = parse(result.schema.json) // Get the row data - val rows = result.getClass.getMethod("take", classOf[Int]) - .invoke(result, maxResult: java.lang.Integer) - .asInstanceOf[Array[Row]] + val rows = result.take(maxResult) .map { _.toSeq.map { // Convert java BigDecimal type to Scala BigDecimal, because current version of @@ -109,7 +98,7 @@ class SQLInterpreter( val jRows = Extraction.decompose(rows) Interpreter.ExecuteSuccess( - APPLICATION_JSON -> (("schema" -> jSchema) ~ ("data" -> jRows))) + APPLICATION_JSON -> (("schema" -> schema) ~ ("data" -> jRows))) } catch { case e: InvocationTargetException => warn(s"Fail to execute query $code", e.getTargetException) diff --git a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala index d2c12eb00..37c959410 100644 --- a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala @@ -54,16 +54,14 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { |SELECT * FROM people """.stripMargin) - // In Spark 1.6, 2.0, 2.2 the "nullable" field of column "age" is false. In spark 2.1, this - // field is true. - val expectedResult = (nullable: Boolean) => { + resp1 should equal( Interpreter.ExecuteSuccess( APPLICATION_JSON -> (("schema" -> (("type" -> "struct") ~ ("fields" -> List( ("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~ ("metadata" -> List()), - ("name" -> "age") ~ ("type" -> "integer") ~ ("nullable" -> nullable) ~ + ("name" -> "age") ~ ("type" -> "integer") ~ ("nullable" -> false) ~ ("metadata" -> List()) )))) ~ ("data" -> List( @@ -71,13 +69,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { List[JValue]("Michael", 21) ))) ) - } - - val result = Try { resp1 should equal(expectedResult(false))} - .orElse(Try { resp1 should equal(expectedResult(true)) }) - if (result.isFailure) { - fail(s"$resp1 doesn't equal to expected result") - } + ) // Test empty result val resp2 = interpreter.execute( @@ -107,14 +99,14 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { |SELECT * FROM test """.stripMargin) - val expectedResult = (nullable: Boolean) => { + resp1 should equal( Interpreter.ExecuteSuccess( APPLICATION_JSON -> (("schema" -> (("type" -> "struct") ~ ("fields" -> List( ("name" -> "col1") ~ ("type" -> "string") ~ ("nullable" -> true) ~ ("metadata" -> List()), - ("name" -> "col2") ~ ("type" -> "decimal(38,18)") ~ ("nullable" -> nullable) ~ + ("name" -> "col2") ~ ("type" -> "decimal(38,18)") ~ ("nullable" -> true) ~ ("metadata" -> List()) )))) ~ ("data" -> List( @@ -122,13 +114,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { List[JValue]("2", 2.0d) ))) ) - } - - val result = Try { resp1 should equal(expectedResult(false))} - .orElse(Try { resp1 should equal(expectedResult(true)) }) - if (result.isFailure) { - fail(s"$resp1 doesn't equal to expected result") - } + ) } it should "throw exception for illegal query" in withInterpreter { interpreter => diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index f6034a759..790f912cf 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -218,15 +218,8 @@ public void run() { return new ChildProcess(conf, promise, child, confFile); } else { final SparkLauncher launcher = new SparkLauncher(); - - // Spark 1.x does not support specifying deploy mode in conf and needs special handling. - String deployMode = conf.get(SPARK_DEPLOY_MODE); - if (deployMode != null) { - launcher.setDeployMode(deployMode); - } - launcher.setSparkHome(System.getenv(SPARK_HOME_ENV)); - launcher.setAppResource("spark-internal"); + launcher.setAppResource(SparkLauncher.NO_RESOURCE); launcher.setPropertiesFile(confFile.getAbsolutePath()); launcher.setMainClass(RSCDriverBootstrapper.class.getName()); diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java index b6bb52016..7c2b98729 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java @@ -63,8 +63,8 @@ public JavaSparkContext sc() { @SuppressWarnings("unchecked") @Override - public Object sparkSession() throws Exception { - return sparkEntries.sparkSession(); + public E sparkSession() throws Exception { + return (E) sparkEntries.sparkSession(); } @Override @@ -85,7 +85,7 @@ public synchronized JavaStreamingContext streamingctx(){ @SuppressWarnings("unchecked") @Override - public Object getSharedObject(String name) throws NoSuchElementException { + public E getSharedObject(String name) throws NoSuchElementException { Object obj; synchronized (sharedVariables) { // Remove the entry and insert again to achieve LRU. @@ -96,7 +96,7 @@ public Object getSharedObject(String name) throws NoSuchElementException { sharedVariables.put(name, obj); } - return obj; + return (E) obj; } @@ -109,13 +109,13 @@ public void setSharedObject(String name, Object object) { @SuppressWarnings("unchecked") @Override - public Object removeSharedObject(String name) { + public E removeSharedObject(String name) { Object obj; synchronized (sharedVariables) { obj = sharedVariables.remove(name); } - return obj; + return (E) obj; } @Override diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java index c28bac912..c64fc72c9 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/SparkEntries.java @@ -24,6 +24,8 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SparkSession$; import org.apache.spark.sql.hive.HiveContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ public class SparkEntries { private final SparkConf conf; private volatile SQLContext sqlctx; private volatile HiveContext hivectx; - private volatile Object sparksession; + private volatile SparkSession sparksession; public SparkEntries(SparkConf conf) { this.conf = conf; @@ -58,41 +60,29 @@ public JavaSparkContext sc() { return sc; } - @SuppressWarnings("unchecked") - public Object sparkSession() throws Exception { + public SparkSession sparkSession() throws Exception { if (sparksession == null) { synchronized (this) { if (sparksession == null) { + SparkSession.Builder builder = SparkSession.builder().sparkContext(sc().sc()); try { - Class clz = Class.forName("org.apache.spark.sql.SparkSession$"); - Object spark = clz.getField("MODULE$").get(null); - Method m = clz.getMethod("builder"); - Object builder = m.invoke(spark); - builder.getClass().getMethod("sparkContext", SparkContext.class) - .invoke(builder, sc().sc()); - SparkConf conf = sc().getConf(); - if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase() - .equals("hive")) { - if ((boolean) clz.getMethod("hiveClassesArePresent").invoke(spark)) { - ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ? - Thread.currentThread().getContextClassLoader() : getClass().getClassLoader(); - if (loader.getResource("hive-site.xml") == null) { - LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " + - "classpath"); - } + String catalog = conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase(); - builder.getClass().getMethod("enableHiveSupport").invoke(builder); - sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder); - LOG.info("Created Spark session (with Hive support)."); - } else { - builder.getClass().getMethod("config", String.class, String.class) - .invoke(builder, "spark.sql.catalogImplementation", "in-memory"); - sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder); - LOG.info("Created Spark session."); + if (catalog.equals("hive") && SparkSession$.MODULE$.hiveClassesArePresent()) { + ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ? + Thread.currentThread().getContextClassLoader() : getClass().getClassLoader(); + if (loader.getResource("hive-site.xml") == null) { + LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " + + "classpath"); } + + builder.enableHiveSupport(); + sparksession = builder.getOrCreate(); + LOG.info("Created Spark session (with Hive support)."); } else { - sparksession = builder.getClass().getMethod("getOrCreate").invoke(builder); + builder.config("spark.sql.catalogImplementation", "in-memory"); + sparksession = builder.getOrCreate(); LOG.info("Created Spark session."); } } catch (Exception e) { diff --git a/scala-api/scala-2.10/pom.xml b/scala-api/scala-2.10/pom.xml deleted file mode 100644 index ffff2407b..000000000 --- a/scala-api/scala-2.10/pom.xml +++ /dev/null @@ -1,38 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-scala-api_2.10 - 0.6.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-scala-api-parent - 0.6.0-incubating-SNAPSHOT - ../pom.xml - - - - ${scala-2.10.version} - 2.10 - ${spark.scala-2.10.version} - ${netty.spark-2.10.version} - - diff --git a/scala-api/src/main/resources/build.marker b/scala-api/src/main/resources/build.marker deleted file mode 100644 index e69de29bb..000000000 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 9e61f83c8..17030e9e5 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -47,8 +47,7 @@ object LivyConf { // Two configurations to specify Spark and related Scala version. These are internal // configurations will be set by LivyServer and used in session creation. It is not required to - // set usually unless running with unofficial Spark + Scala versions - // (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11) + // set usually unless running with unofficial Spark + Scala combinations. val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null) val LIVY_SPARK_VERSION = Entry("livy.spark.version", null) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 3d1e6cd8a..c15057bc5 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -83,9 +83,6 @@ object BatchSession extends Logging { request.queue.foreach(builder.queue) request.name.foreach(builder.name) - // Spark 1.x does not support specifying deploy mode in conf and needs special handling. - livyConf.sparkDeployMode().foreach(builder.deployMode) - sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata) builder.redirectOutput(Redirect.PIPE) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 43a61ac14..3b3095f37 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -201,22 +201,14 @@ object InteractiveSession extends Logging { } else { val sparkHome = livyConf.sparkHome().get val libdir = sparkMajorVersion match { - case 1 => - if (new File(sparkHome, "RELEASE").isFile) { - new File(sparkHome, "lib") - } else { - new File(sparkHome, "lib_managed/jars") - } case 2 => if (new File(sparkHome, "RELEASE").isFile) { new File(sparkHome, "jars") - } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { - new File(sparkHome, "assembly/target/scala-2.11/jars") } else { - new File(sparkHome, "assembly/target/scala-2.10/jars") + new File(sparkHome, "assembly/target/scala-2.11/jars") } case v => - throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion) + throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion") } val jars = if (!libdir.isDirectory) { Seq.empty[String] @@ -342,13 +334,8 @@ object InteractiveSession extends Logging { // pass spark.livy.spark_major_version to driver builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString) - if (sparkMajorVersion <= 1) { - builderProperties.put("spark.repl.enableHiveContext", - livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString) - } else { - val confVal = if (enableHiveContext) "hive" else "in-memory" - builderProperties.put("spark.sql.catalogImplementation", confVal) - } + val confVal = if (enableHiveContext) "hive" else "in-memory" + builderProperties.put("spark.sql.catalogImplementation", confVal) if (enableHiveContext) { mergeHiveSiteAndHiveDeps(sparkMajorVersion) diff --git a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala index 02a59c567..6097d3213 100644 --- a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala +++ b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala @@ -33,17 +33,11 @@ object LivySparkUtils extends Logging { // Spark 2.3 + Scala 2.11 (2, 3) -> "2.11", // Spark 2.2 + Scala 2.11 - (2, 2) -> "2.11", - // Spark 2.1 + Scala 2.11 - (2, 1) -> "2.11", - // Spark 2.0 + Scala 2.11 - (2, 0) -> "2.11", - // Spark 1.6 + Scala 2.10 - (1, 6) -> "2.10" + (2, 2) -> "2.11" ) // Supported Spark version - private val MIN_VERSION = (1, 6) + private val MIN_VERSION = (2, 2) private val MAX_VERSION = (2, 4) private val sparkVersionRegex = """version (.*)""".r.unanchored diff --git a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala index 70e8653b8..b16e74ff2 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala @@ -50,8 +50,8 @@ abstract class BaseInteractiveServletSpec super.createConf() .set(LivyConf.SESSION_STAGING_DIR, tempDir.toURI().toString()) .set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") + .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION")) + .set(LivyConf.LIVY_SPARK_SCALA_VERSION, sys.env("LIVY_SCALA_VERSION")) } protected def createRequest( diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 20e1f2dba..f07e61f5a 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -46,8 +46,8 @@ class InteractiveSessionSpec extends FunSpec private val livyConf = new LivyConf() livyConf.set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") + .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION")) + .set(LivyConf.LIVY_SPARK_SCALA_VERSION, sys.env("LIVY_SCALA_VERSION")) implicit val formats = DefaultFormats @@ -113,7 +113,7 @@ class InteractiveSessionSpec extends FunSpec ) val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, testedJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") + .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION")) .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf) assert(properties(LivyConf.SPARK_JARS).split(",").toSet === Set("test_2.10-0.1.jar", @@ -139,7 +139,7 @@ class InteractiveSessionSpec extends FunSpec val livyConf = new LivyConf(false) .set(LivyConf.REPL_JARS, "dummy.jar") .set(LivyConf.RSC_JARS, rscJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") + .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION")) .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf) // if livy.rsc.jars is configured in LivyConf, it should be passed to RSCConf. diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala index c0394e04d..ab3e7155a 100644 --- a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala +++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala @@ -43,33 +43,20 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu testSparkSubmit(livyConf) } - test("should support Spark 1.6") { - testSparkVersion("1.6.0") - testSparkVersion("1.6.1") - testSparkVersion("1.6.1-SNAPSHOT") - testSparkVersion("1.6.2") - testSparkVersion("1.6") - testSparkVersion("1.6.3.2.5.0-12") - } - - test("should support Spark 2.0.x") { - testSparkVersion("2.0.0") - testSparkVersion("2.0.1") - testSparkVersion("2.0.2") - testSparkVersion("2.0.3-SNAPSHOT") - testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229 - testSparkVersion("2.0") - testSparkVersion("2.1.0") - testSparkVersion("2.1.1") + test("should recognize supported Spark versions") { testSparkVersion("2.2.0") + testSparkVersion("2.3.0") } - test("should not support Spark older than 1.6") { + test("should complain about unsupported Spark versions") { intercept[IllegalArgumentException] { testSparkVersion("1.4.0") } intercept[IllegalArgumentException] { testSparkVersion("1.5.0") } intercept[IllegalArgumentException] { testSparkVersion("1.5.1") } intercept[IllegalArgumentException] { testSparkVersion("1.5.2") } intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") } + intercept[IllegalArgumentException] { testSparkVersion("1.6.0") } + intercept[IllegalArgumentException] { testSparkVersion("2.0.1") } + intercept[IllegalArgumentException] { testSparkVersion("2.1.3") } } test("should fail on bad version") { @@ -96,14 +83,8 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu } test("defaultSparkScalaVersion() should return default Scala version") { - defaultSparkScalaVersion(formatSparkVersion("1.6.0")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("1.6.1")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("1.6.2")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("2.0.0")) shouldBe "2.11" - defaultSparkScalaVersion(formatSparkVersion("2.0.1")) shouldBe "2.11" - - // Throw exception for unsupported Spark version. - intercept[IllegalArgumentException] { defaultSparkScalaVersion(formatSparkVersion("1.5.0")) } + defaultSparkScalaVersion(formatSparkVersion("2.2.1")) shouldBe "2.11" + defaultSparkScalaVersion(formatSparkVersion("2.3.0")) shouldBe "2.11" } test("sparkScalaVersion() should use spark-submit detected Scala version.") { @@ -120,23 +101,8 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu } } - test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11" - } - test("sparkScalaVersion() should use default Spark Scala version.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11" sparkScalaVersion(formatSparkVersion("2.2.0"), None, livyConf) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("2.3.1"), None, livyConf) shouldBe "2.11" } } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala index 75bab0b63..605a810ff 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.hive.service.cli.SessionHandle +import org.apache.spark.sql.{Row, SparkSession} import org.apache.livy._ import org.apache.livy.server.interactive.InteractiveSession @@ -34,11 +35,6 @@ import org.apache.livy.utils.LivySparkUtils class RpcClient(livySession: InteractiveSession) extends Logging { import RpcClient._ - private val isSpark1 = { - val (sparkMajorVersion, _) = - LivySparkUtils.formatSparkVersion(livySession.livyConf.get(LivyConf.LIVY_SPARK_VERSION)) - sparkMajorVersion == 1 - } private val defaultIncrementalCollect = livySession.livyConf.getBoolean(LivyConf.THRIFT_INCR_COLLECT_ENABLED).toString @@ -63,7 +59,6 @@ class RpcClient(livySession: InteractiveSession) extends Logging { rscClient.submit(executeSqlJob(sessionId(sessionHandle), statementId, statement, - isSpark1, defaultIncrementalCollect, s"spark.${LivyConf.THRIFT_INCR_COLLECT_ENABLED}")) } @@ -104,7 +99,7 @@ class RpcClient(livySession: InteractiveSession) extends Logging { def executeRegisterSession(sessionHandle: SessionHandle): JobHandle[_] = { info(s"RSC client is executing register session $sessionHandle") livySession.recordActivity() - rscClient.submit(registerSessionJob(sessionId(sessionHandle), isSpark1)) + rscClient.submit(registerSessionJob(sessionId(sessionHandle))) } /** @@ -123,23 +118,18 @@ class RpcClient(livySession: InteractiveSession) extends Logging { * order to enforce that we are not accessing any class attribute */ object RpcClient { - // Maps a session ID to its SparkSession (or HiveContext/SQLContext according to the Spark - // version used) + // Maps a session ID to its SparkSession. val SESSION_SPARK_ENTRY_MAP = "livy.thriftserver.rpc_sessionIdToSparkSQLSession" val STATEMENT_RESULT_ITER_MAP = "livy.thriftserver.rpc_statementIdToResultIter" val STATEMENT_SCHEMA_MAP = "livy.thriftserver.rpc_statementIdToSchema" - private def registerSessionJob(sessionId: String, isSpark1: Boolean): Job[_] = new Job[Boolean] { + private def registerSessionJob(sessionId: String): Job[_] = new Job[Boolean] { override def call(jc: JobContext): Boolean = { - val spark: Any = if (isSpark1) { - Option(jc.hivectx()).getOrElse(jc.sqlctx()) - } else { - jc.sparkSession() - } - val sessionSpecificSpark = spark.getClass.getMethod("newSession").invoke(spark) + val spark = jc.sparkSession[SparkSession]() + val sessionSpecificSpark = spark.newSession() jc.sc().synchronized { val existingMap = - Try(jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)) + Try(jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)) .getOrElse(new HashMap[String, AnyRef]()) jc.setSharedObject(SESSION_SPARK_ENTRY_MAP, existingMap + ((sessionId, sessionSpecificSpark))) @@ -147,9 +137,9 @@ object RpcClient { .failed.foreach { _ => jc.setSharedObject(STATEMENT_SCHEMA_MAP, new HashMap[String, String]()) } - Try(jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)) + Try(jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)) .failed.foreach { _ => - jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[_]]()) + jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[Row]]()) } } true @@ -160,7 +150,7 @@ object RpcClient { override def call(jobContext: JobContext): Boolean = { jobContext.sc().synchronized { val existingMap = - jobContext.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP) + jobContext.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP) jobContext.setSharedObject(SESSION_SPARK_ENTRY_MAP, existingMap - sessionId) } true @@ -176,7 +166,7 @@ object RpcClient { if (sparkContext.getLocalProperty("spark.jobGroup.id") == statementId) { sparkContext.clearJobGroup() } - val iterMap = jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP) + val iterMap = jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP) jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, iterMap - statementId) val schemaMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP) jc.setSharedObject(STATEMENT_SCHEMA_MAP, schemaMap - statementId) @@ -196,7 +186,7 @@ object RpcClient { maxRows: Int): Job[ColumnOrientedResultSet] = new Job[ColumnOrientedResultSet] { override def call(jobContext: JobContext): ColumnOrientedResultSet = { val statementIterMap = - jobContext.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP) + jobContext.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP) val iter = statementIterMap(statementId) if (null == iter) { @@ -212,13 +202,13 @@ object RpcClient { var curRow = 0 while (curRow < maxRows && iter.hasNext) { val sparkRow = iter.next() - val row = ArrayBuffer[Any]() + val row = ArrayBuffer[Object]() var curCol: Integer = 0 while (curCol < numOfColumns) { - row += sparkRow.getClass.getMethod("get", classOf[Int]).invoke(sparkRow, curCol) + row += sparkRow.get(curCol).asInstanceOf[Object] curCol += 1 } - resultSet.addRow(row.toArray.asInstanceOf[Array[Object]]) + resultSet.addRow(row.toArray) curRow += 1 } resultSet @@ -229,7 +219,6 @@ object RpcClient { private def executeSqlJob(sessionId: String, statementId: String, statement: String, - isSpark1: Boolean, defaultIncrementalCollect: String, incrementalCollectEnabledProp: String): Job[_] = new Job[Boolean] { override def call(jc: JobContext): Boolean = { @@ -237,46 +226,31 @@ object RpcClient { sparkContext.synchronized { sparkContext.setJobGroup(statementId, statement) } - val spark = jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)(sessionId) + val spark = + jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)(sessionId) try { - val result = spark.getClass.getMethod("sql", classOf[String]).invoke(spark, statement) - val schema = result.getClass.getMethod("schema").invoke(result) - val jsonString = schema.getClass.getMethod("json").invoke(schema).asInstanceOf[String] + val result = spark.sql(statement) + val jsonSchema = result.schema.json // Set the schema in the shared map sparkContext.synchronized { val existingMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP) - jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonString))) + jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonSchema))) } - val incrementalCollect = { - if (isSpark1) { - spark.getClass.getMethod("getConf", classOf[String], classOf[String]) - .invoke(spark, - incrementalCollectEnabledProp, - defaultIncrementalCollect) - .asInstanceOf[String].toBoolean - } else { - val conf = spark.getClass.getMethod("conf").invoke(spark) - conf.getClass.getMethod("get", classOf[String], classOf[String]) - .invoke(conf, - incrementalCollectEnabledProp, - defaultIncrementalCollect) - .asInstanceOf[String].toBoolean - } - } + val incrementalCollect = spark.conf.get(incrementalCollectEnabledProp, + defaultIncrementalCollect).toBoolean val iter = if (incrementalCollect) { - val rdd = result.getClass.getMethod("rdd").invoke(result) - rdd.getClass.getMethod("toLocalIterator").invoke(rdd).asInstanceOf[Iterator[_]] + result.rdd.toLocalIterator } else { - result.getClass.getMethod("collect").invoke(result).asInstanceOf[Array[_]].iterator + result.collect().iterator } // Set the iterator in the shared map sparkContext.synchronized { val existingMap = - jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP) + jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP) jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, existingMap + ((statementId, iter))) } } catch {