From 03ef6be9ce61a13dcd9d8c71298fb4be39119411 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 8 Jun 2015 17:50:38 +0800 Subject: [PATCH 01/18] [SPARK-7939] [SQL] Add conf to enable/disable partition column type inference JIRA: https://issues.apache.org/jira/browse/SPARK-7939 Author: Liang-Chi Hsieh Closes #6503 from viirya/disable_partition_type_inference and squashes the following commits: 3e90470 [Liang-Chi Hsieh] Default to enable type inference and update docs. 455edb1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into disable_partition_type_inference 9a57933 [Liang-Chi Hsieh] Add conf to enable/disable partition column type inference. --- docs/sql-programming-guide.md | 6 +- .../scala/org/apache/spark/sql/SQLConf.scala | 6 ++ .../spark/sql/sources/PartitioningUtils.scala | 48 ++++++----- .../apache/spark/sql/sources/interfaces.scala | 4 +- .../ParquetPartitionDiscoverySuite.scala | 79 ++++++++++++++++++- 5 files changed, 119 insertions(+), 24 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index cde5830c733e0..40e33f757d693 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1102,7 +1102,11 @@ root {% endhighlight %} Notice that the data types of the partitioning columns are automatically inferred. Currently, -numeric data types and string type are supported. +numeric data types and string type are supported. Sometimes users may not want to automatically +infer the data types of the partitioning columns. For these use cases, the automatic type inference +can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to +`true`. When type inference is disabled, string type will be used for the partitioning columns. + ### Schema merging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 77c6af27d1007..c778889045d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -71,6 +71,9 @@ private[spark] object SQLConf { // Whether to perform partition discovery when loading external data sources. Default to true. val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled" + // Whether to perform partition column type inference. Default to true. + val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled" + // The output committer class used by FSBasedRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" @@ -250,6 +253,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def partitionDiscoveryEnabled() = getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean + private[spark] def partitionColumnTypeInferenceEnabled() = + getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean + // Do not use a value larger than 4000 as the default value of this property. // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. private[spark] def schemaStringLengthThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index c4c99de5a38dc..9f6ec2ed8fc8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -72,10 +72,11 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartitions( paths: Seq[Path], - defaultPartitionName: String): PartitionSpec = { + defaultPartitionName: String, + typeInference: Boolean): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName).map(path -> _) + parsePartition(path, defaultPartitionName, typeInference).map(path -> _) } if (pathsWithPartitionValues.isEmpty) { @@ -124,7 +125,8 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartition( path: Path, - defaultPartitionName: String): Option[PartitionValues] = { + defaultPartitionName: String, + typeInference: Boolean): Option[PartitionValues] = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null @@ -137,7 +139,7 @@ private[sql] object PartitioningUtils { return None } - val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) chopped = chopped.getParent finished = maybeColumn.isEmpty || chopped.getParent == null @@ -153,7 +155,8 @@ private[sql] object PartitioningUtils { private def parsePartitionColumn( columnSpec: String, - defaultPartitionName: String): Option[(String, Literal)] = { + defaultPartitionName: String, + typeInference: Boolean): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { None @@ -164,7 +167,7 @@ private[sql] object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName) + val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference) Some(columnName -> literal) } } @@ -211,19 +214,28 @@ private[sql] object PartitioningUtils { */ private[sql] def inferPartitionColumnValue( raw: String, - defaultPartitionName: String): Literal = { - // First tries integral types - Try(Literal.create(Integer.parseInt(raw), IntegerType)) - .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) - // Then falls back to fractional types - .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) - .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) - .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) - // Then falls back to string - .getOrElse { - if (raw == defaultPartitionName) Literal.create(null, NullType) - else Literal.create(unescapePathName(raw), StringType) + defaultPartitionName: String, + typeInference: Boolean): Literal = { + if (typeInference) { + // First tries integral types + Try(Literal.create(Integer.parseInt(raw), IntegerType)) + .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) + // Then falls back to fractional types + .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) + .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) + .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) + // Then falls back to string + .getOrElse { + if (raw == defaultPartitionName) Literal.create(null, NullType) + else Literal.create(unescapePathName(raw), StringType) + } + } else { + if (raw == defaultPartitionName) { + Literal.create(null, NullType) + } else { + Literal.create(unescapePathName(raw), StringType) } + } } private val upCastingOrder: Seq[DataType] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 25887ba9a15b0..d1547fb1e4abb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -491,9 +491,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } private def discoverPartitions(): PartitionSpec = { + val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() // We use leaf dirs containing data files to discover the schema. val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq - PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index d9a010a9815a1..c2f1cc8ffd1fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -48,7 +48,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("column type inference") { def check(raw: String, literal: Literal): Unit = { - assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal) + assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal) } check("10", Literal.create(10, IntegerType)) @@ -60,12 +60,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName)) + assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName).get + parsePartition(new Path(path), defaultPartitionName, true).get }.getMessage assert(message.contains(expected)) @@ -105,7 +105,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { test("parse partitions") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec) + assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec) } check(Seq( @@ -174,6 +174,77 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { PartitionSpec.emptySpec) } + test("parse partitions with type inference disabled") { + def check(paths: Seq[String], spec: PartitionSpec): Unit = { + assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec) + } + + check(Seq( + "hdfs://host:9000/path/a=10/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq(Partition(Row("10", "hello"), "hdfs://host:9000/path/a=10/b=hello")))) + + check(Seq( + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"), + Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + + check(Seq( + "hdfs://host:9000/path/_temporary", + "hdfs://host:9000/path/a=10/b=20", + "hdfs://host:9000/path/a=10.5/b=hello", + "hdfs://host:9000/path/a=10.5/_temporary", + "hdfs://host:9000/path/a=10.5/_TeMpOrArY", + "hdfs://host:9000/path/a=10.5/b=hello/_temporary", + "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY", + "hdfs://host:9000/path/_temporary/path", + "hdfs://host:9000/path/a=11/_temporary/path", + "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"), + Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello")))) + + check(Seq( + s"hdfs://host:9000/path/a=10/b=20", + s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", "20"), s"hdfs://host:9000/path/a=10/b=20"), + Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello")))) + + check(Seq( + s"hdfs://host:9000/path/a=10/b=$defaultPartitionName", + s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"), + PartitionSpec( + StructType(Seq( + StructField("a", StringType), + StructField("b", StringType))), + Seq( + Partition(Row("10", null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), + Partition(Row("10.5", null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName")))) + + check(Seq( + s"hdfs://host:9000/path1", + s"hdfs://host:9000/path2"), + PartitionSpec.emptySpec) + } + test("read partitioned table - normal case") { withTempDir { base => for { From a1d9e5cc60d317ecf8fe390b66b623ae39c4534d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 8 Jun 2015 15:37:28 +0100 Subject: [PATCH 02/18] [SPARK-8126] [BUILD] Use custom temp directory during build. Even with all the efforts to cleanup the temp directories created by unit tests, Spark leaves a lot of garbage in /tmp after a test run. This change overrides java.io.tmpdir to place those files under the build directory instead. After an sbt full unit test run, I was left with > 400 MB of temp files. Since they're now under the build dir, it's much easier to clean them up. Also make a slight change to a unit test to make it not pollute the source directory with test data. Author: Marcelo Vanzin Closes #6674 from vanzin/SPARK-8126 and squashes the following commits: 0f8ad41 [Marcelo Vanzin] Make sure tmp dir exists when tests run. 643e916 [Marcelo Vanzin] [MINOR] [BUILD] Use custom temp directory during build. --- .../spark/deploy/SparkSubmitUtilsSuite.scala | 22 +++++++++-------- pom.xml | 24 ++++++++++++++++++- project/SparkBuild.scala | 6 +++++ 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 8fda5c8b472c9..07d261cc428c4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -28,9 +28,12 @@ import org.apache.ivy.plugins.resolver.IBiblioResolver import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.util.Utils class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { + private var tempIvyPath: String = _ + private val noOpOutputStream = new OutputStream { def write(b: Int) = {} } @@ -47,6 +50,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { super.beforeAll() // We don't want to write logs during testing SparkSubmitUtils.printStream = new BufferPrintStream + tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath() } test("incorrect maven coordinate throws error") { @@ -90,21 +94,20 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { } test("ivy path works correctly") { - val ivyPath = "dummy" + File.separator + "ivy" val md = SparkSubmitUtils.getModuleDescriptor val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath)) + var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) for (i <- 0 until 3) { - val index = jPaths.indexOf(ivyPath) + val index = jPaths.indexOf(tempIvyPath) assert(index >= 0) - jPaths = jPaths.substring(index + ivyPath.length) + jPaths = jPaths.substring(index + tempIvyPath.length) } val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo), - Option(ivyPath), true) - assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path") + Option(tempIvyPath), true) + assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") } } @@ -123,13 +126,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(jarPath.indexOf("mylib") >= 0, "should find artifact") } // Local ivy repository with modified home - val dummyIvyPath = "dummy" + File.separator + "ivy" - val dummyIvyLocal = new File(dummyIvyPath, "local" + File.separator) + val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator) IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo => val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, - Some(dummyIvyPath), true) + Some(tempIvyPath), true) assert(jarPath.indexOf("mylib") >= 0, "should find artifact") - assert(jarPath.indexOf(dummyIvyPath) >= 0, "should be in new ivy path") + assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") } } diff --git a/pom.xml b/pom.xml index 67b6375f576d3..5a5d183e3dcca 100644 --- a/pom.xml +++ b/pom.xml @@ -179,7 +179,7 @@ compile ${session.executionRootDirectory} @@ -1256,6 +1256,7 @@ test true + ${project.build.directory}/tmp ${spark.test.home} 1 false @@ -1289,6 +1290,7 @@ test true + ${project.build.directory}/tmp ${spark.test.home} 1 false @@ -1548,6 +1550,26 @@ + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-tmp-dir + generate-test-resources + + run + + + + + + + + + + org.apache.maven.plugins diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ef3a175bac209..d7e374558c5e2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -51,6 +51,11 @@ object BuildCommons { // Root project. val spark = ProjectRef(buildLocation, "spark") val sparkHome = buildLocation + + val testTempDir = s"$sparkHome/target/tmp" + if (!new File(testTempDir).isDirectory()) { + require(new File(testTempDir).mkdirs()) + } } object SparkBuild extends PomBuild { @@ -496,6 +501,7 @@ object TestSettings { "SPARK_DIST_CLASSPATH" -> (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"), "JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))), + javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir", javaOptions in Test += "-Dspark.test.home=" + sparkHome, javaOptions in Test += "-Dspark.testing=1", javaOptions in Test += "-Dspark.port.maxRetries=100", From e3e9c70384028cc0c322ccea14f19d3b6d6b39eb Mon Sep 17 00:00:00 2001 From: MechCoder Date: Mon, 8 Jun 2015 15:45:12 +0100 Subject: [PATCH 03/18] [SPARK-8140] [MLLIB] Remove empty model check in StreamingLinearAlgorithm 1. Prevent creating a map of data to find numFeatures 2. If model is empty, then initialize with a zero vector of numFeature Author: MechCoder Closes #6684 from MechCoder/spark-8140 and squashes the following commits: 7fbf5f9 [MechCoder] [SPARK-8140] Remove empty model check in StreamingLinearAlgorithm And other minor cosmits --- .../apache/spark/mllib/optimization/GradientDescent.scala | 2 +- .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala | 6 +++--- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 3 --- .../mllib/regression/StreamingLinearRegressionWithSGD.scala | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 4b7d0589c973b..06e45e10c5bf4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -179,7 +179,7 @@ object GradientDescent extends Logging { * if it's L2 updater; for L1 updater, the same logic is followed. */ var regVal = updater.compute( - weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 + weights, Vectors.zeros(weights.size), 0, 1, regParam)._2 for (i <- 1 to numIterations) { val bcWeights = data.context.broadcast(weights) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 26be30ff9d6fd..6709bd79bc820 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -195,11 +195,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] */ val initialWeights = { if (numOfLinearPredictor == 1) { - Vectors.dense(new Array[Double](numFeatures)) + Vectors.zeros(numFeatures) } else if (addIntercept) { - Vectors.dense(new Array[Double]((numFeatures + 1) * numOfLinearPredictor)) + Vectors.zeros((numFeatures + 1) * numOfLinearPredictor) } else { - Vectors.dense(new Array[Double](numFeatures * numOfLinearPredictor)) + Vectors.zeros(numFeatures * numOfLinearPredictor) } } run(input, initialWeights) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index cea8f3f47307b..39308e5ae1dde 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -87,9 +87,6 @@ abstract class StreamingLinearAlgorithm[ model match { case Some(m) => m.weights - case None => - val numFeatures = rdd.first().features.size - Vectors.dense(numFeatures) } model = Some(algorithm.run(rdd, initialWeights)) logInfo("Model updated at time %s".format(time.toString)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index a49153bf73c0d..235e043c7754b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -79,7 +79,7 @@ class StreamingLinearRegressionWithSGD private[mllib] ( this } - /** Set the initial weights. Default: [0.0, 0.0]. */ + /** Set the initial weights. */ def setInitialWeights(initialWeights: Vector): this.type = { this.model = Some(algorithm.createModel(initialWeights, 0.0)) this From 149d1b28e899177ed170292fd2af30aad5a610e0 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Mon, 8 Jun 2015 16:23:43 +0100 Subject: [PATCH 04/18] [SMALL FIX] Return null if catch EOFException Return null if catch EOFException, just like function "asKeyValueIterator" in this class Author: Mingfei Closes #6703 from shimingfei/returnNull and squashes the following commits: 205deec [Mingfei] return null if catch EOFException --- core/src/main/scala/org/apache/spark/serializer/Serializer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index f1bdff96d3df1..bd2704dc81871 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -182,6 +182,7 @@ abstract class DeserializationStream { } catch { case eof: EOFException => finished = true + null } } From 49f19b954b32d57d03ca0e25ea4205d01e794d48 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 8 Jun 2015 09:41:06 -0700 Subject: [PATCH 05/18] [MINOR] change new Exception to IllegalArgumentException Author: Daoyuan Wang Closes #6434 from adrian-wang/joinerr and squashes the following commits: ee1b64f [Daoyuan Wang] break line f7c53e9 [Daoyuan Wang] to IllegalArgumentException f8dea2d [Daoyuan Wang] sys.err to IllegalStateException be82259 [Daoyuan Wang] change new exception to sys.err --- .../apache/spark/sql/execution/joins/HashOuterJoin.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 45574392996ca..c21a453115292 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -48,7 +48,8 @@ case class HashOuterJoin( case LeftOuter => left.outputPartitioning case RightOuter => right.outputPartitioning case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions) - case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") + case x => + throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType") } override def requiredChildDistribution: Seq[ClusteredDistribution] = @@ -63,7 +64,7 @@ case class HashOuterJoin( case FullOuter => left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) case x => - throw new Exception(s"HashOuterJoin should not take $x as the JoinType") + throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType") } } @@ -216,7 +217,8 @@ case class HashOuterJoin( rightHashTable.getOrElse(key, EMPTY_LIST), joinedRow) } - case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType") + case x => + throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType") } } } From ed5c2dccd0397c4c4b0008c437e6845dd583c9c2 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 8 Jun 2015 11:06:27 -0700 Subject: [PATCH 06/18] [SPARK-8158] [SQL] several fix for HiveShim 1. explicitly import implicit conversion support. 2. use .nonEmpty instead of .size > 0 3. use val instead of var 4. comment indention Author: Daoyuan Wang Closes #6700 from adrian-wang/shimsimprove and squashes the following commits: d22e108 [Daoyuan Wang] several fix for HiveShim --- .../org/apache/spark/sql/hive/HiveShim.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index fa5409f602444..d08c594151654 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -20,6 +20,11 @@ package org.apache.spark.sql.hive import java.io.{InputStream, OutputStream} import java.rmi.server.UID +/* Implicit conversions */ +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.hadoop.conf.Configuration @@ -35,10 +40,6 @@ import org.apache.spark.Logging import org.apache.spark.sql.types.Decimal import org.apache.spark.util.Utils -/* Implicit conversions */ -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - private[hive] object HiveShim { // Precision and scale to pass for unlimited decimals; these are the same as the precision and // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs) @@ -68,10 +69,10 @@ private[hive] object HiveShim { * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty */ def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { - if (ids != null && ids.size > 0) { + if (ids != null && ids.nonEmpty) { ColumnProjectionUtils.appendReadColumns(conf, ids) } - if (names != null && names.size > 0) { + if (names != null && names.nonEmpty) { appendReadColumnNames(conf, names) } } @@ -197,11 +198,11 @@ private[hive] object HiveShim { } /* - * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. - * Fix it through wrapper. - * */ + * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. + * Fix it through wrapper. + */ implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { - var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) + val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) f.setCompressCodec(w.compressCodec) f.setCompressType(w.compressType) f.setTableInfo(w.tableInfo) From bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 8 Jun 2015 11:34:18 -0700 Subject: [PATCH 07/18] [SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver* side `TaskAttemptContext` (executor side configurations are properly populated). Currently this should only affect Parquet output committer class configuration. Author: Cheng Lian Closes #6669 from liancheng/spark-8121 and squashes the following commits: 73819e8 [Cheng Lian] Minor logging fix fce089c [Cheng Lian] Adds more logging b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing 963a1aa [Cheng Lian] Addresses @yhuai's comment c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization --- .../scala/org/apache/spark/sql/SQLConf.scala | 1 + .../apache/spark/sql/parquet/newParquet.scala | 7 +++ .../apache/spark/sql/sources/commands.scala | 18 +++++-- .../spark/sql/parquet/ParquetIOSuite.scala | 52 ++++++++++++++++--- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index c778889045d02..be786f9b7f49e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -76,6 +76,7 @@ private[spark] object SQLConf { // The output committer class used by FSBasedRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. + // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf` val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" // Whether to perform eager analysis when constructing a dataframe. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 5dda440240e60..7af4eb1ca4716 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -212,6 +212,13 @@ private[sql] class ParquetRelation2( classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter]) + if (conf.get("spark.sql.parquet.output.committer.class") == null) { + logInfo("Using default output committer for Parquet: " + + classOf[ParquetOutputCommitter].getCanonicalName) + } else { + logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + } + conf.setClass( SQLConf.OUTPUT_COMMITTER_CLASS, committerClass, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index bd3aad6631748..c94199bfcd233 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer( def driverSideSetup(): Unit = { setupIDs(0, 0, 0) setupConf() - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - // This preparation must happen before initializing output format and output committer, since - // their initialization involves the job configuration, which can be potentially decorated in - // `relation.prepareJobForWrite`. + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor + // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, + // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. + // + // Also, the `prepareJobForWrite` call must happen before initializing output format and output + // committer, since their initialization involve the job configuration, which can be potentially + // decorated in `prepareJobForWrite`. outputWriterFactory = relation.prepareJobForWrite(job) + taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) @@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer( SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter]) Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat // has an associated output committer. To override this output committer, // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. @@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer( }.getOrElse { // If output committer class is not set, we will use the one associated with the // file output format. - outputFormatClass.newInstance().getOutputCommitter(context) + val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}") + outputCommitter } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 2b6a27032e637..46b25859d9a68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} +import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.DateUtils @@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { withParquetDataFrame(allNulls :: Nil) { df => val rows = df.collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows.head === Row(Seq.fill(5)(null): _*)) } } @@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { withParquetDataFrame(allNones :: Nil) { df => val rows = df.collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows.head === Row(Seq.fill(3)(null): _*)) } } @@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } test("SPARK-6352 DirectParquetOutputCommitter") { + val clonedConf = new Configuration(configuration) + // Write to a parquet file and let it fail. // _temporary should be missing if direct output committer works. try { @@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val fs = path.getFileSystem(configuration) assert(!fs.exists(path)) } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } - finally { - configuration.set("spark.sql.parquet.output.committer.class", - "org.apache.parquet.hadoop.ParquetOutputCommitter") + } + + test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") { + withTempPath { dir => + val clonedConf = new Configuration(configuration) + + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName) + + configuration.set( + "spark.sql.parquet.output.committer.class", + classOf[BogusParquetOutputCommitter].getCanonicalName) + + try { + val message = intercept[SparkException] { + sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + } } } } +class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + + override def commitJob(jobContext: JobContext): Unit = { + sys.error("Intentional exception for testing purposes") + } +} + class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll { private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi From fe7669d3072b72954ad0c3f2f8846a0fde839ead Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 8 Jun 2015 11:52:02 -0700 Subject: [PATCH 08/18] [SQL][minor] remove duplicated cases in `DecimalPrecision` We already have a rule to do type coercion for fixed decimal and unlimited decimal in `WidenTypes`, so we don't need to handle them in `DecimalPrecision`. Author: Wenchen Fan Closes #6698 from cloud-fan/fix and squashes the following commits: 413ad4a [Wenchen Fan] remove duplicated cases --- .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index a42ffce0d26fa..737905c3582ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -445,12 +445,6 @@ trait HiveTypeCoercion { e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 => val resultType = DecimalType(max(p1, p2), max(s1, s2)) b.makeCopy(Array(Cast(e1, resultType), Cast(e2, resultType))) - case b @ BinaryComparison(e1 @ DecimalType.Fixed(_, _), e2) - if e2.dataType == DecimalType.Unlimited => - b.makeCopy(Array(Cast(e1, DecimalType.Unlimited), e2)) - case b @ BinaryComparison(e1, e2 @ DecimalType.Fixed(_, _)) - if e1.dataType == DecimalType.Unlimited => - b.makeCopy(Array(e1, Cast(e2, DecimalType.Unlimited))) // Promote integers inside a binary expression with fixed-precision decimals to decimals, // and fixed-precision decimals in an expression with floats / doubles to doubles From 51853891686f353dc9decc31066b0de01ed8b49e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 8 Jun 2015 13:15:44 -0700 Subject: [PATCH 09/18] [SPARK-8148] Do not use FloatType in partition column inference. Use DoubleType instead to be more stable and robust. Author: Reynold Xin Closes #6692 from rxin/SPARK-8148 and squashes the following commits: 6742ecc [Reynold Xin] [SPARK-8148] Do not use FloatType in partition column inference. --- .../spark/sql/sources/PartitioningUtils.scala | 16 +++++++++------- .../parquet/ParquetPartitionDiscoverySuite.scala | 12 ++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 9f6ec2ed8fc8d..7a2b5b949dd4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import java.lang.{Double => JDouble, Float => JFloat, Long => JLong} +import java.lang.{Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import scala.collection.mutable.ArrayBuffer @@ -178,7 +178,7 @@ private[sql] object PartitioningUtils { * {{{ * NullType -> * IntegerType -> LongType -> - * FloatType -> DoubleType -> DecimalType.Unlimited -> + * DoubleType -> DecimalType.Unlimited -> * StringType * }}} */ @@ -208,8 +208,8 @@ private[sql] object PartitioningUtils { } /** - * Converts a string to a `Literal` with automatic type inference. Currently only supports - * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and + * Converts a string to a [[Literal]] with automatic type inference. Currently only supports + * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and * [[StringType]]. */ private[sql] def inferPartitionColumnValue( @@ -221,13 +221,15 @@ private[sql] object PartitioningUtils { Try(Literal.create(Integer.parseInt(raw), IntegerType)) .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) // Then falls back to fractional types - .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) // Then falls back to string .getOrElse { - if (raw == defaultPartitionName) Literal.create(null, NullType) - else Literal.create(unescapePathName(raw), StringType) + if (raw == defaultPartitionName) { + Literal.create(null, NullType) + } else { + Literal.create(unescapePathName(raw), StringType) + } } } else { if (raw == defaultPartitionName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index c2f1cc8ffd1fb..3240079483545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -53,7 +53,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { check("10", Literal.create(10, IntegerType)) check("1000000000000000", Literal.create(1000000000000000L, LongType)) - check("1.5", Literal.create(1.5f, FloatType)) + check("1.5", Literal.create(1.5, DoubleType)) check("hello", Literal.create("hello", StringType)) check(defaultPartitionName, Literal.create(null, NullType)) } @@ -83,13 +83,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { ArrayBuffer( Literal.create(10, IntegerType), Literal.create("hello", StringType), - Literal.create(1.5f, FloatType))) + Literal.create(1.5, DoubleType))) }) check("file://path/a=10/b_hello/c=1.5", Some { PartitionValues( ArrayBuffer("c"), - ArrayBuffer(Literal.create(1.5f, FloatType))) + ArrayBuffer(Literal.create(1.5, DoubleType))) }) check("file:///", None) @@ -121,7 +121,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { "hdfs://host:9000/path/a=10.5/b=hello"), PartitionSpec( StructType(Seq( - StructField("a", FloatType), + StructField("a", DoubleType), StructField("b", StringType))), Seq( Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"), @@ -140,7 +140,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), PartitionSpec( StructType(Seq( - StructField("a", FloatType), + StructField("a", DoubleType), StructField("b", StringType))), Seq( Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"), @@ -162,7 +162,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"), PartitionSpec( StructType(Seq( - StructField("a", FloatType), + StructField("a", DoubleType), StructField("b", StringType))), Seq( Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), From f3eec92ce7e13cc461d2f0404f26730259210f12 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 8 Jun 2015 18:09:21 -0700 Subject: [PATCH 10/18] [SPARK-8162] [HOTFIX] Fix NPE in spark-shell This was caused by this commit: f271347 This patch does not attempt to fix the root cause of why the `VisibleForTesting` annotation causes a NPE in the shell. We should find a way to fix that separately. Author: Andrew Or Closes #6711 from andrewor14/fix-spark-shell and squashes the following commits: bf62ecc [Andrew Or] Prevent NPE in spark-shell --- .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 730f9806e518e..0c854f04890b6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -539,11 +539,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** * For testing only. Wait until at least `numExecutors` executors are up, or throw * `TimeoutException` if the waiting time elapsed before `numExecutors` executors up. + * Exposed for testing. * * @param numExecutors the number of executors to wait at least * @param timeout time to wait in milliseconds */ - @VisibleForTesting private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = { val finishTime = System.currentTimeMillis() + timeout while (System.currentTimeMillis() < finishTime) { From 82870d507dfaeeaf315d6766ca1496205c6216d3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 8 Jun 2015 21:33:47 -0700 Subject: [PATCH 11/18] [SPARK-8168] [MLLIB] Add Python friendly constructor to PipelineModel This makes the constructor callable in Python. dbtsai Author: Xiangrui Meng Closes #6709 from mengxr/SPARK-8168 and squashes the following commits: f871de4 [Xiangrui Meng] Add Python friendly constructor to PipelineModel --- .../scala/org/apache/spark/ml/Pipeline.scala | 8 ++++++++ .../org/apache/spark/ml/PipelineSuite.scala | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index 11a4722722ea1..a9bd28df71ee1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -17,6 +17,9 @@ package org.apache.spark.ml +import java.{util => ju} + +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.Logging @@ -175,6 +178,11 @@ class PipelineModel private[ml] ( val stages: Array[Transformer]) extends Model[PipelineModel] with Logging { + /** A Java/Python-friendly auxiliary constructor. */ + private[ml] def this(uid: String, stages: ju.List[Transformer]) = { + this(uid, stages.asScala.toArray) + } + override def validateParams(): Unit = { super.validateParams() stages.foreach(_.validateParams()) diff --git a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala index 05bf58e63abaf..29394fefcbc43 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml +import scala.collection.JavaConverters._ + import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.when import org.scalatest.mock.MockitoSugar.mock @@ -81,4 +83,19 @@ class PipelineSuite extends SparkFunSuite { pipeline.fit(dataset) } } + + test("pipeline model constructors") { + val transform0 = mock[Transformer] + val model1 = mock[MyModel] + + val stages = Array(transform0, model1) + val pipelineModel0 = new PipelineModel("pipeline0", stages) + assert(pipelineModel0.uid === "pipeline0") + assert(pipelineModel0.stages === stages) + + val stagesAsList = stages.toList.asJava + val pipelineModel1 = new PipelineModel("pipeline1", stagesAsList) + assert(pipelineModel1.uid === "pipeline1") + assert(pipelineModel1.stages === stages) + } } From a5c52c1a3488b69bec19e460d2d1fdb0c9ada58d Mon Sep 17 00:00:00 2001 From: hqzizania Date: Mon, 8 Jun 2015 21:40:12 -0700 Subject: [PATCH 12/18] [SPARK-6820] [SPARKR] Convert NAs to null type in SparkR DataFrames Author: hqzizania Closes #6190 from hqzizania/R and squashes the following commits: 1641f9e [hqzizania] fixes and add test units bb3411a [hqzizania] Convert NAs to null type in SparkR DataFrames --- R/pkg/R/serialize.R | 8 +++++++ R/pkg/inst/tests/test_sparkSQL.R | 37 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R index 2081786e6f833..3169d7968f8fe 100644 --- a/R/pkg/R/serialize.R +++ b/R/pkg/R/serialize.R @@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) { # passing in vectors as arrays and instead require arrays to be passed # as lists. type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt") + # Checking types is needed here, since ‘is.na’ only handles atomic vectors, + # lists and pairlists + if (type %in% c("integer", "character", "logical", "double", "numeric")) { + if (is.na(object)) { + object <- NULL + type <- "NULL" + } + } if (writeType) { writeType(con, type) } diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 30edfc8a7bd94..8946348ef801c 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -101,6 +101,43 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) }) +test_that("convert NAs to null type in DataFrames", { + rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) + df <- createDataFrame(sqlContext, rdd, list("a", "b")) + expect_true(is.na(collect(df)[2, "a"])) + expect_equal(collect(df)[2, "b"], 4L) + + l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L)) + df <- createDataFrame(sqlContext, l) + expect_equal(collect(df)[2, "x"], 1L) + expect_true(is.na(collect(df)[2, "y"])) + + rdd <- parallelize(sc, list(list(1, 2), list(NA, 4))) + df <- createDataFrame(sqlContext, rdd, list("a", "b")) + expect_true(is.na(collect(df)[2, "a"])) + expect_equal(collect(df)[2, "b"], 4) + + l <- data.frame(x = 1, y = c(1, NA_real_, 3)) + df <- createDataFrame(sqlContext, l) + expect_equal(collect(df)[2, "x"], 1) + expect_true(is.na(collect(df)[2, "y"])) + + l <- list("a", "b", NA, "d") + df <- createDataFrame(sqlContext, l) + expect_true(is.na(collect(df)[3, "_1"])) + expect_equal(collect(df)[4, "_1"], "d") + + l <- list("a", "b", NA_character_, "d") + df <- createDataFrame(sqlContext, l) + expect_true(is.na(collect(df)[3, "_1"])) + expect_equal(collect(df)[4, "_1"], "d") + + l <- list(TRUE, FALSE, NA, TRUE) + df <- createDataFrame(sqlContext, l) + expect_true(is.na(collect(df)[3, "_1"])) + expect_equal(collect(df)[4, "_1"], TRUE) +}) + test_that("toDF", { rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) df <- toDF(rdd, list("a", "b")) From 7658eb28a2ea28c06e3b5a26f7734a7dc36edc19 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 8 Jun 2015 23:27:05 -0700 Subject: [PATCH 13/18] [SPARK-7990][SQL] Add methods to facilitate equi-join on multiple joining keys JIRA: https://issues.apache.org/jira/browse/SPARK-7990 Author: Liang-Chi Hsieh Closes #6616 from viirya/multi_keys_equi_join and squashes the following commits: cd5c888 [Liang-Chi Hsieh] Import reduce in python3. c43722c [Liang-Chi Hsieh] For comments. 0400e89 [Liang-Chi Hsieh] Fix scala style. cc90015 [Liang-Chi Hsieh] Add methods to facilitate equi-join on multiple joining keys. --- python/pyspark/sql/dataframe.py | 45 +++++++++++++------ .../org/apache/spark/sql/DataFrame.scala | 40 ++++++++++++++--- .../apache/spark/sql/DataFrameJoinSuite.scala | 9 ++++ 3 files changed, 75 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2d8c59518b35a..e9dd05e2d0c7a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -22,6 +22,7 @@ if sys.version >= '3': basestring = unicode = str long = int + from functools import reduce else: from itertools import imap as map @@ -503,36 +504,52 @@ def alias(self, alias): @ignore_unicode_prefix @since(1.3) - def join(self, other, joinExprs=None, joinType=None): + def join(self, other, on=None, how=None): """Joins with another :class:`DataFrame`, using the given join expression. The following performs a full outer join between ``df1`` and ``df2``. :param other: Right side of the join - :param joinExprs: a string for join column name, or a join expression (Column). - If joinExprs is a string indicating the name of the join column, - the column must exist on both sides, and this performs an inner equi-join. - :param joinType: str, default 'inner'. + :param on: a string for join column name, a list of column names, + , a join expression (Column) or a list of Columns. + If `on` is a string or a list of string indicating the name of the join column(s), + the column(s) must exist on both sides, and this performs an inner equi-join. + :param how: str, default 'inner'. One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() [Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] + >>> cond = [df.name == df3.name, df.age == df3.age] + >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() + [Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] + >>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)] + + >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() + [Row(name=u'Bob', age=5)] """ - if joinExprs is None: + if on is not None and not isinstance(on, list): + on = [on] + + if on is None or len(on) == 0: jdf = self._jdf.join(other._jdf) - elif isinstance(joinExprs, basestring): - jdf = self._jdf.join(other._jdf, joinExprs) + + if isinstance(on[0], basestring): + jdf = self._jdf.join(other._jdf, self._jseq(on)) else: - assert isinstance(joinExprs, Column), "joinExprs should be Column" - if joinType is None: - jdf = self._jdf.join(other._jdf, joinExprs._jc) + assert isinstance(on[0], Column), "on should be Column or list of Column" + if len(on) > 1: + on = reduce(lambda x, y: x.__and__(y), on) + else: + on = on[0] + if how is None: + jdf = self._jdf.join(other._jdf, on._jc, "inner") else: - assert isinstance(joinType, basestring), "joinType should be basestring" - jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType) + assert isinstance(how, basestring), "how should be basestring" + jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix @@ -1315,6 +1332,8 @@ def _test(): .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() + globs['df3'] = sc.parallelize([Row(name='Alice', age=2), + Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4a224153e1a37..59f64dd4bc648 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -395,22 +395,50 @@ class DataFrame private[sql]( * @since 1.4.0 */ def join(right: DataFrame, usingColumn: String): DataFrame = { + join(right, Seq(usingColumn)) + } + + /** + * Inner equi-join with another [[DataFrame]] using the given columns. + * + * Different from other join functions, the join columns will only appear once in the output, + * i.e. similar to SQL's `JOIN USING` syntax. + * + * {{{ + * // Joining df1 and df2 using the columns "user_id" and "user_name" + * df1.join(df2, Seq("user_id", "user_name")) + * }}} + * + * Note that if you perform a self-join using this function without aliasing the input + * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since + * there is no way to disambiguate which side of the join you would like to reference. + * + * @param right Right side of the join operation. + * @param usingColumns Names of the columns to join on. This columns must exist on both sides. + * @group dfops + * @since 1.4.0 + */ + def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] - // Project only one of the join column. - val joinedCol = joined.right.resolve(usingColumn) + // Project only one of the join columns. + val joinedCols = usingColumns.map(col => joined.right.resolve(col)) + val condition = usingColumns.map { col => + catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col)) + }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) => + catalyst.expressions.And(cond, eqTo) + } + Project( - joined.output.filterNot(_ == joinedCol), + joined.output.filterNot(joinedCols.contains(_)), Join( joined.left, joined.right, joinType = Inner, - Some(catalyst.expressions.EqualTo( - joined.left.resolve(usingColumn), - joined.right.resolve(usingColumn)))) + condition) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 051d13e9a544f..6165764632c29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -34,6 +34,15 @@ class DataFrameJoinSuite extends QueryTest { Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil) } + test("join - join using multiple columns") { + val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str") + val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str") + + checkAnswer( + df.join(df2, Seq("int", "int2")), + Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) + } + test("join - join using self join") { val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") From 0902a11940e550e85a53e110b490fe90e16ddaf4 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 9 Jun 2015 08:00:04 +0100 Subject: [PATCH 14/18] [SPARK-8101] [CORE] Upgrade netty to avoid memory leak accord to netty #3837 issues Update to Netty 4.0.28-Final Author: Sean Owen Closes #6701 from srowen/SPARK-8101 and squashes the following commits: f3b6369 [Sean Owen] Update to Netty 4.0.28-Final --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5a5d183e3dcca..e9700a5d7b149 100644 --- a/pom.xml +++ b/pom.xml @@ -587,7 +587,7 @@ io.netty netty-all - 4.0.23.Final + 4.0.28.Final org.apache.derby From 1b499993ad185b04dd5065facb565cbe7e249521 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 9 Jun 2015 16:24:38 +0800 Subject: [PATCH 15/18] [SPARK-7886] Add built-in expressions to FunctionRegistry. This patch switches to using FunctionRegistry for built-in expressions. It is based on #6463, but with some work to simplify it along with unit tests. TODOs for future pull requests: - Use static registration so we don't need to register all functions every time we start a new SQLContext - Switch to using this in HiveContext Author: Reynold Xin Author: Santiago M. Mola Closes #6710 from rxin/udf-registry and squashes the following commits: 6930822 [Reynold Xin] Fixed Python test. b802c9a [Reynold Xin] Made UDF case insensitive. e60d815 [Reynold Xin] Made UDF case insensitive. 852f9c0 [Reynold Xin] Fixed style violation. e76a3c1 [Reynold Xin] Fixed parser. 52ddaba [Reynold Xin] Fixed compilation. ee7854f [Reynold Xin] Improved error reporting. ff906f2 [Reynold Xin] More robust constructor calling. 77b46f1 [Reynold Xin] Simplified the code. 2a2a149 [Reynold Xin] Merge pull request #6463 from smola/SPARK-7886 8616924 [Santiago M. Mola] [SPARK-7886] Add built-in expressions to FunctionRegistry. --- python/pyspark/sql/dataframe.py | 2 +- .../apache/spark/sql/catalyst/SqlParser.scala | 75 +++++------ .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../catalyst/analysis/FunctionRegistry.scala | 127 +++++++++++++----- .../sql/catalyst/expressions/Expression.scala | 9 ++ .../sql/catalyst/expressions/random.scala | 23 +++- .../expressions/stringOperations.scala | 7 + .../sql/catalyst/util/StringKeyHashMap.scala | 44 ++++++ .../org/apache/spark/sql/SQLContext.scala | 6 +- .../scala/org/apache/spark/sql/UDFSuite.scala | 42 ++++++ .../apache/spark/sql/hive/HiveContext.scala | 9 +- .../org/apache/spark/sql/hive/hiveUdfs.scala | 14 +- 12 files changed, 269 insertions(+), 93 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e9dd05e2d0c7a..9615e576497cd 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -746,7 +746,7 @@ def selectExpr(self, *expr): This is a variant of :func:`select` that accepts SQL expressions. >>> df.selectExpr("age * 2", "abs(age)").collect() - [Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)] + [Row((age * 2)=4, 'abs(age)=2), Row((age * 2)=10, 'abs(age)=5)] """ if len(expr) == 1 and isinstance(expr[0], list): expr = expr[0] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index e85312aee7d16..f74c17d583359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst import scala.language.implicitConversions +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ @@ -48,26 +49,21 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword` // properties via reflection the class in runtime for constructing the SqlLexical object - protected val ABS = Keyword("ABS") protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") - protected val AVG = Keyword("AVG") protected val BETWEEN = Keyword("BETWEEN") protected val BY = Keyword("BY") protected val CASE = Keyword("CASE") protected val CAST = Keyword("CAST") - protected val COALESCE = Keyword("COALESCE") - protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") protected val ELSE = Keyword("ELSE") protected val END = Keyword("END") protected val EXCEPT = Keyword("EXCEPT") protected val FALSE = Keyword("FALSE") - protected val FIRST = Keyword("FIRST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") @@ -80,13 +76,9 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") - protected val LAST = Keyword("LAST") protected val LEFT = Keyword("LEFT") protected val LIKE = Keyword("LIKE") protected val LIMIT = Keyword("LIMIT") - protected val LOWER = Keyword("LOWER") - protected val MAX = Keyword("MAX") - protected val MIN = Keyword("MIN") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") @@ -100,15 +92,10 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected val RLIKE = Keyword("RLIKE") protected val SELECT = Keyword("SELECT") protected val SEMI = Keyword("SEMI") - protected val SQRT = Keyword("SQRT") - protected val SUBSTR = Keyword("SUBSTR") - protected val SUBSTRING = Keyword("SUBSTRING") - protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") protected val THEN = Keyword("THEN") protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") - protected val UPPER = Keyword("UPPER") protected val WHEN = Keyword("WHEN") protected val WHERE = Keyword("WHERE") protected val WITH = Keyword("WITH") @@ -277,25 +264,36 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { ) protected lazy val function: Parser[Expression] = - ( SUM ~> "(" ~> expression <~ ")" ^^ { case exp => Sum(exp) } - | SUM ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => SumDistinct(exp) } - | COUNT ~ "(" ~> "*" <~ ")" ^^ { case _ => Count(Literal(1)) } - | COUNT ~ "(" ~> expression <~ ")" ^^ { case exp => Count(exp) } - | COUNT ~> "(" ~> DISTINCT ~> repsep(expression, ",") <~ ")" ^^ - { case exps => CountDistinct(exps) } - | APPROXIMATE ~ COUNT ~ "(" ~ DISTINCT ~> expression <~ ")" ^^ - { case exp => ApproxCountDistinct(exp) } - | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ - { case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble) } - | FIRST ~ "(" ~> expression <~ ")" ^^ { case exp => First(exp) } - | LAST ~ "(" ~> expression <~ ")" ^^ { case exp => Last(exp) } - | AVG ~ "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } - | MIN ~ "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } - | MAX ~ "(" ~> expression <~ ")" ^^ { case exp => Max(exp) } - | UPPER ~ "(" ~> expression <~ ")" ^^ { case exp => Upper(exp) } - | LOWER ~ "(" ~> expression <~ ")" ^^ { case exp => Lower(exp) } - | IF ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^ - { case c ~ t ~ f => If(c, t, f) } + ( ident <~ ("(" ~ "*" ~ ")") ^^ { case udfName => + if (lexical.normalizeKeyword(udfName) == "count") { + Count(Literal(1)) + } else { + throw new AnalysisException(s"invalid expression $udfName(*)") + } + } + | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ + { case udfName ~ exprs => UnresolvedFunction(udfName, exprs) } + | ident ~ ("(" ~ DISTINCT ~> repsep(expression, ",")) <~ ")" ^^ { case udfName ~ exprs => + lexical.normalizeKeyword(udfName) match { + case "sum" => SumDistinct(exprs.head) + case "count" => CountDistinct(exprs) + } + } + | APPROXIMATE ~> ident ~ ("(" ~ DISTINCT ~> expression <~ ")") ^^ { case udfName ~ exp => + if (lexical.normalizeKeyword(udfName) == "count") { + ApproxCountDistinct(exp) + } else { + throw new AnalysisException(s"invalid function approximate $udfName") + } + } + | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ + { case s ~ _ ~ udfName ~ _ ~ _ ~ exp => + if (lexical.normalizeKeyword(udfName) == "count") { + ApproxCountDistinct(exp, s.toDouble) + } else { + throw new AnalysisException(s"invalid function approximate($floatLit) $udfName") + } + } | CASE ~> expression.? ~ rep1(WHEN ~> expression ~ (THEN ~> expression)) ~ (ELSE ~> expression).? <~ END ^^ { case casePart ~ altPart ~ elsePart => @@ -304,16 +302,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { } ++ elsePart casePart.map(CaseKeyWhen(_, branches)).getOrElse(CaseWhen(branches)) } - | (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) <~ ")" ^^ - { case s ~ p => Substring(s, p, Literal(Integer.MAX_VALUE)) } - | (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^ - { case s ~ p ~ l => Substring(s, p, l) } - | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => Coalesce(exprs) } - | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } - | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } - | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ - { case udfName ~ exprs => UnresolvedFunction(udfName, exprs) } - ) + ) protected lazy val cast: Parser[Expression] = CAST ~ "(" ~> expression ~ (AS ~> dataType) <~ ")" ^^ { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5883d938b676d..02b10c444d1a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -461,7 +461,9 @@ class Analyzer( case q: LogicalPlan => q transformExpressions { case u @ UnresolvedFunction(name, children) if u.childrenResolved => - registry.lookupFunction(name, children) + withPosition(u) { + registry.lookupFunction(name, children) + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0849faa9bfa7b..406f6fad8413b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -17,24 +17,27 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.expressions.Expression -import scala.collection.mutable +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.StringKeyHashMap + /** A catalog for looking up user defined functions, used by an [[Analyzer]]. */ trait FunctionRegistry { - type FunctionBuilder = Seq[Expression] => Expression def registerFunction(name: String, builder: FunctionBuilder): Unit + @throws[AnalysisException]("If function does not exist") def lookupFunction(name: String, children: Seq[Expression]): Expression - - def conf: CatalystConf } trait OverrideFunctionRegistry extends FunctionRegistry { - val functionBuilders = StringKeyHashMap[FunctionBuilder](conf.caseSensitiveAnalysis) + private val functionBuilders = StringKeyHashMap[FunctionBuilder](caseSensitive = false) override def registerFunction(name: String, builder: FunctionBuilder): Unit = { functionBuilders.put(name, builder) @@ -45,16 +48,19 @@ trait OverrideFunctionRegistry extends FunctionRegistry { } } -class SimpleFunctionRegistry(val conf: CatalystConf) extends FunctionRegistry { +class SimpleFunctionRegistry extends FunctionRegistry { - val functionBuilders = StringKeyHashMap[FunctionBuilder](conf.caseSensitiveAnalysis) + private val functionBuilders = StringKeyHashMap[FunctionBuilder](caseSensitive = false) override def registerFunction(name: String, builder: FunctionBuilder): Unit = { functionBuilders.put(name, builder) } override def lookupFunction(name: String, children: Seq[Expression]): Expression = { - functionBuilders(name)(children) + val func = functionBuilders.get(name).getOrElse { + throw new AnalysisException(s"undefined function $name") + } + func(children) } } @@ -70,30 +76,89 @@ object EmptyFunctionRegistry extends FunctionRegistry { override def lookupFunction(name: String, children: Seq[Expression]): Expression = { throw new UnsupportedOperationException } - - override def conf: CatalystConf = throw new UnsupportedOperationException } -/** - * Build a map with String type of key, and it also supports either key case - * sensitive or insensitive. - * TODO move this into util folder? - */ -object StringKeyHashMap { - def apply[T](caseSensitive: Boolean): StringKeyHashMap[T] = caseSensitive match { - case false => new StringKeyHashMap[T](_.toLowerCase) - case true => new StringKeyHashMap[T](identity) - } -} -class StringKeyHashMap[T](normalizer: (String) => String) { - private val base = new collection.mutable.HashMap[String, T]() +object FunctionRegistry { - def apply(key: String): T = base(normalizer(key)) + type FunctionBuilder = Seq[Expression] => Expression - def get(key: String): Option[T] = base.get(normalizer(key)) - def put(key: String, value: T): Option[T] = base.put(normalizer(key), value) - def remove(key: String): Option[T] = base.remove(normalizer(key)) - def iterator: Iterator[(String, T)] = base.toIterator + val expressions: Map[String, FunctionBuilder] = Map( + // Non aggregate functions + expression[Abs]("abs"), + expression[CreateArray]("array"), + expression[Coalesce]("coalesce"), + expression[Explode]("explode"), + expression[Lower]("lower"), + expression[Substring]("substr"), + expression[Substring]("substring"), + expression[Rand]("rand"), + expression[Randn]("randn"), + expression[CreateStruct]("struct"), + expression[Sqrt]("sqrt"), + expression[Upper]("upper"), + + // Math functions + expression[Acos]("acos"), + expression[Asin]("asin"), + expression[Atan]("atan"), + expression[Atan2]("atan2"), + expression[Cbrt]("cbrt"), + expression[Ceil]("ceil"), + expression[Cos]("cos"), + expression[Exp]("exp"), + expression[Expm1]("expm1"), + expression[Floor]("floor"), + expression[Hypot]("hypot"), + expression[Log]("log"), + expression[Log10]("log10"), + expression[Log1p]("log1p"), + expression[Pow]("pow"), + expression[Rint]("rint"), + expression[Signum]("signum"), + expression[Sin]("sin"), + expression[Sinh]("sinh"), + expression[Tan]("tan"), + expression[Tanh]("tanh"), + expression[ToDegrees]("todegrees"), + expression[ToRadians]("toradians"), + + // aggregate functions + expression[Average]("avg"), + expression[Count]("count"), + expression[First]("first"), + expression[Last]("last"), + expression[Max]("max"), + expression[Min]("min"), + expression[Sum]("sum") + ) + + /** See usage above. */ + private def expression[T <: Expression](name: String) + (implicit tag: ClassTag[T]): (String, FunctionBuilder) = { + // Use the companion class to find apply methods. + val objectClass = Class.forName(tag.runtimeClass.getName + "$") + val companionObj = objectClass.getDeclaredField("MODULE$").get(null) + + // See if we can find an apply that accepts Seq[Expression] + val varargApply = Try(objectClass.getDeclaredMethod("apply", classOf[Seq[_]])).toOption + + val builder = (expressions: Seq[Expression]) => { + if (varargApply.isDefined) { + // If there is an apply method that accepts Seq[Expression], use that one. + varargApply.get.invoke(companionObj, expressions).asInstanceOf[Expression] + } else { + // Otherwise, find an apply method that matches the number of arguments, and use that. + val params = Seq.fill(expressions.size)(classOf[Expression]) + val f = Try(objectClass.getDeclaredMethod("apply", params : _*)) match { + case Success(e) => + e + case Failure(e) => + throw new AnalysisException(s"Invalid number of arguments for function $name") + } + f.invoke(companionObj, expressions : _*).asInstanceOf[Expression] + } + } + (name, builder) + } } - diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index a9a9c0cfb7027..f2ed1f0929987 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -23,6 +23,15 @@ import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types._ + +/** + * For Catalyst to work correctly, concrete implementations of [[Expression]]s must be case classes + * whose constructor arguments are all Expressions types. In addition, if we want to support more + * than one constructor, define those constructors explicitly as apply methods in the companion + * object. + * + * See [[Substring]] for an example. + */ abstract class Expression extends TreeNode[Expression] { self: Product => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala index b2647124c4e49..6e4e9cb1be090 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.TaskContext +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types.{DataType, DoubleType} import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom @@ -46,11 +47,29 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable { } /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */ -case class Rand(seed: Long = Utils.random.nextLong()) extends RDG(seed) { +case class Rand(seed: Long) extends RDG(seed) { override def eval(input: Row): Double = rng.nextDouble() } +object Rand { + def apply(): Rand = apply(Utils.random.nextLong()) + + def apply(seed: Expression): Rand = apply(seed match { + case IntegerLiteral(s) => s + case _ => throw new AnalysisException("Input argument to rand must be an integer literal.") + }) +} + /** Generate a random column with i.i.d. gaussian random distribution. */ -case class Randn(seed: Long = Utils.random.nextLong()) extends RDG(seed) { +case class Randn(seed: Long) extends RDG(seed) { override def eval(input: Row): Double = rng.nextGaussian() } + +object Randn { + def apply(): Randn = apply(Utils.random.nextLong()) + + def apply(seed: Expression): Randn = apply(seed match { + case IntegerLiteral(s) => s + case _ => throw new AnalysisException("Input argument to rand must be an integer literal.") + }) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index aae122a981e47..856f56488c7a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -227,6 +227,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression) override def foldable: Boolean = str.foldable && pos.foldable && len.foldable override def nullable: Boolean = str.nullable || pos.nullable || len.nullable + override def dataType: DataType = { if (!resolved) { throw new UnresolvedException(this, s"Cannot resolve since $children are not resolved") @@ -287,3 +288,9 @@ case class Substring(str: Expression, pos: Expression, len: Expression) case _ => s"SUBSTR($str, $pos, $len)" } } + +object Substring { + def apply(str: Expression, pos: Expression): Substring = { + apply(str, pos, Literal(Integer.MAX_VALUE)) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala new file mode 100644 index 0000000000000..191d5e6399fc9 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +/** + * Build a map with String type of key, and it also supports either key case + * sensitive or insensitive. + */ +object StringKeyHashMap { + def apply[T](caseSensitive: Boolean): StringKeyHashMap[T] = caseSensitive match { + case false => new StringKeyHashMap[T](_.toLowerCase) + case true => new StringKeyHashMap[T](identity) + } +} + + +class StringKeyHashMap[T](normalizer: (String) => String) { + private val base = new collection.mutable.HashMap[String, T]() + + def apply(key: String): T = base(normalizer(key)) + + def get(key: String): Option[T] = base.get(normalizer(key)) + + def put(key: String, value: T): Option[T] = base.put(normalizer(key), value) + + def remove(key: String): Option[T] = base.remove(normalizer(key)) + + def iterator: Iterator[(String, T)] = base.toIterator +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ddb54025baa24..8cad3885b7d46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,11 @@ class SQLContext(@transient val sparkContext: SparkContext) // TODO how to handle the temp function per user session? @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(conf) + protected[sql] lazy val functionRegistry: FunctionRegistry = { + val fr = new SimpleFunctionRegistry + FunctionRegistry.expressions.foreach { case (name, func) => fr.registerFunction(name, func) } + fr + } @transient protected[sql] lazy val analyzer: Analyzer = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 064c040d2b771..703a34c47ec20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -25,6 +25,48 @@ class UDFSuite extends QueryTest { private lazy val ctx = org.apache.spark.sql.test.TestSQLContext import ctx.implicits._ + test("built-in fixed arity expressions") { + val df = ctx.emptyDataFrame + df.selectExpr("rand()", "randn()", "rand(5)", "randn(50)") + } + + test("built-in vararg expressions") { + val df = Seq((1, 2)).toDF("a", "b") + df.selectExpr("array(a, b)") + df.selectExpr("struct(a, b)") + } + + test("built-in expressions with multiple constructors") { + val df = Seq(("abcd", 2)).toDF("a", "b") + df.selectExpr("substr(a, 2)", "substr(a, 2, 3)").collect() + } + + test("count") { + val df = Seq(("abcd", 2)).toDF("a", "b") + df.selectExpr("count(a)") + } + + test("count distinct") { + val df = Seq(("abcd", 2)).toDF("a", "b") + df.selectExpr("count(distinct a)") + } + + test("error reporting for incorrect number of arguments") { + val df = ctx.emptyDataFrame + val e = intercept[AnalysisException] { + df.selectExpr("substr('abcd', 2, 3, 4)") + } + assert(e.getMessage.contains("arguments")) + } + + test("error reporting for undefined functions") { + val df = ctx.emptyDataFrame + val e = intercept[AnalysisException] { + df.selectExpr("a_function_that_does_not_exist()") + } + assert(e.getMessage.contains("undefined function")) + } + test("Simple UDF") { ctx.udf.register("strLenScala", (_: String).length) assert(ctx.sql("SELECT strLenScala('test')").head().getInt(0) === 4) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b8f294c262af7..3b8cafb4a6c37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,13 +39,12 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.sources.DataSourceStrategy -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -374,10 +373,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // Note that HiveUDFs will be overridden by functions registered in this context. @transient - override protected[sql] lazy val functionRegistry = - new HiveFunctionRegistry with OverrideFunctionRegistry { - override def conf: CatalystConf = currentSession().conf - } + override protected[sql] lazy val functionRegistry: FunctionRegistry = + new HiveFunctionRegistry with OverrideFunctionRegistry /* An analyzer that uses the Hive metastore. */ @transient diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 01f47352b2313..6e6ac987b668a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -17,11 +17,8 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper -import org.apache.spark.sql.AnalysisException - import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ConstantObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions @@ -30,8 +27,11 @@ import org.apache.hadoop.hive.ql.exec._ import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import org.apache.spark.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ @@ -40,20 +40,18 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ -/* Implicit conversions */ -import scala.collection.JavaConversions._ private[hive] abstract class HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors { def getFunctionInfo(name: String): FunctionInfo = FunctionRegistry.getFunctionInfo(name) - def lookupFunction(name: String, children: Seq[Expression]): Expression = { + override def lookupFunction(name: String, children: Seq[Expression]): Expression = { // We only look it up to see if it exists, but do not include it in the HiveUDF since it is // not always serializable. val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse( - sys.error(s"Couldn't find function $name")) + throw new AnalysisException(s"undefined function $name")) val functionClassName = functionInfo.getFunctionClass.getName From e6fb6cedf3ecbde6f01d4753d7d05d0c52827fce Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 9 Jun 2015 12:19:01 +0100 Subject: [PATCH 16/18] [STREAMING] [DOC] Remove duplicated description about WAL I noticed there is a duplicated description about WAL. ``` To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). ``` Let's remove the duplication. I don't file this issue in JIRA because it's minor. Author: Kousuke Saruta Closes #6719 from sarutak/remove-multiple-description and squashes the following commits: cc9bb21 [Kousuke Saruta] Removed duplicated description about WAL --- docs/streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index d6d5605948a5a..998c8c994e4b4 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -7,7 +7,7 @@ title: Spark Streaming + Kafka Integration Guide ## Approach 1: Receiver-based Approach This approach uses a Receiver to receive the data. The Received is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data. -However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs. +However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs. Next, we discuss how to use this approach in your streaming application. From 6c1723abeb4e0580efec05a655343f46521fc265 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 9 Jun 2015 15:00:35 +0100 Subject: [PATCH 17/18] [SPARK-8140] [MLLIB] Remove construct to get weights in StreamingLinearAlgorithm Author: MechCoder Closes #6720 from MechCoder/empty_model_check and squashes the following commits: 3a07de5 [MechCoder] Remove construct to get weights in StreamingLinearAlgorithm --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 39308e5ae1dde..aee51bf22d8d0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -83,12 +83,7 @@ abstract class StreamingLinearAlgorithm[ throw new IllegalArgumentException("Model must be initialized before starting training.") } data.foreachRDD { (rdd, time) => - val initialWeights = - model match { - case Some(m) => - m.weights - } - model = Some(algorithm.run(rdd, initialWeights)) + model = Some(algorithm.run(rdd, model.get.weights)) logInfo("Model updated at time %s".format(time.toString)) val display = model.get.weights.size match { case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...") From 490d5a72ec1e5105f030fd7110acf62534e05f5a Mon Sep 17 00:00:00 2001 From: FavioVazquez Date: Tue, 9 Jun 2015 15:02:18 +0100 Subject: [PATCH 18/18] [SPARK-8274] [DOCUMENTATION-MLLIB] Fix wrong URLs in MLlib Frequent Pattern Mining Documentation There is a mistake in the URLs of the Scala section of FP-Growth in the MLlib Frequent Pattern Mining documentation. The URL points to https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/fpm/FPGrowth.html which is the Java's API, the link should point to the Scala API https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowth There's another mistake in the FP-GrowthModel in the same section, the link points, again, to the Java's API https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/fpm/FPGrowthModel.html, the link should point to the Scala API https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowthModel Author: FavioVazquez Closes #6722 from FavioVazquez/fix-wrog-urls-mllib-fpgrowth and squashes the following commits: e1ca54d [FavioVazquez] - Fixed wrong URLs in MLlib Frequent Pattern Mining, FP-Growth Scala section ad882a3 [FavioVazquez] Merge remote-tracking branch 'upstream/master' f27a20b [FavioVazquez] Merge remote-tracking branch 'upstream/master' 9af7074 [FavioVazquez] Merge remote-tracking branch 'upstream/master' edab1ef [FavioVazquez] Merge remote-tracking branch 'upstream/master' b2e2f8c [FavioVazquez] Merge remote-tracking branch 'upstream/master' --- docs/mllib-frequent-pattern-mining.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/mllib-frequent-pattern-mining.md b/docs/mllib-frequent-pattern-mining.md index 9fd9be0dd01b1..bcc066a185526 100644 --- a/docs/mllib-frequent-pattern-mining.md +++ b/docs/mllib-frequent-pattern-mining.md @@ -39,11 +39,11 @@ MLlib's FP-growth implementation takes the following (hyper-)parameters:
-[`FPGrowth`](api/java/org/apache/spark/mllib/fpm/FPGrowth.html) implements the +[`FPGrowth`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowth) implements the FP-growth algorithm. It take a `JavaRDD` of transactions, where each transaction is an `Iterable` of items of a generic type. Calling `FPGrowth.run` with transactions returns an -[`FPGrowthModel`](api/java/org/apache/spark/mllib/fpm/FPGrowthModel.html) +[`FPGrowthModel`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowthModel) that stores the frequent itemsets with their frequencies. {% highlight scala %}