From a6c72ab16e7a3027739ab419819f5222e270838e Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 7 Jun 2014 14:20:33 -0700 Subject: [PATCH 01/26] [SPARK-1994][SQL] Weird data corruption bug when running Spark SQL on data in HDFS Basically there is a race condition (possibly a scala bug?) when these values are recomputed on all of the slaves that results in an incorrect projection being generated (possibly because the GUID uniqueness contract is broken?). In general we should probably enforce that all expression planing occurs on the driver, as is now occurring here. Author: Michael Armbrust Closes #1004 from marmbrus/fixAggBug and squashes the following commits: e0c116c [Michael Armbrust] Compute aggregate expression during planning instead of lazily on workers. --- .../apache/spark/sql/execution/Aggregate.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 604914e547790..34d88fe4bd7de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -77,8 +77,7 @@ case class Aggregate( resultAttribute: AttributeReference) /** A list of aggregates that need to be computed for each group. */ - @transient - private[this] lazy val computedAggregates = aggregateExpressions.flatMap { agg => + private[this] val computedAggregates = aggregateExpressions.flatMap { agg => agg.collect { case a: AggregateExpression => ComputedAggregate( @@ -89,8 +88,7 @@ case class Aggregate( }.toArray /** The schema of the result of all aggregate evaluations */ - @transient - private[this] lazy val computedSchema = computedAggregates.map(_.resultAttribute) + private[this] val computedSchema = computedAggregates.map(_.resultAttribute) /** Creates a new aggregate buffer for a group. */ private[this] def newAggregateBuffer(): Array[AggregateFunction] = { @@ -104,8 +102,7 @@ case class Aggregate( } /** Named attributes used to substitute grouping attributes into the final result. */ - @transient - private[this] lazy val namedGroups = groupingExpressions.map { + private[this] val namedGroups = groupingExpressions.map { case ne: NamedExpression => ne -> ne.toAttribute case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute } @@ -114,16 +111,14 @@ case class Aggregate( * A map of substitutions that are used to insert the aggregate expressions and grouping * expression into the final result expression. */ - @transient - private[this] lazy val resultMap = + private[this] val resultMap = (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap /** * Substituted version of aggregateExpressions expressions which are used to compute final * output rows given a group and the result of all aggregate computations. */ - @transient - private[this] lazy val resultExpressions = aggregateExpressions.map { agg => + private[this] val resultExpressions = aggregateExpressions.map { agg => agg.transform { case e: Expression if resultMap.contains(e) => resultMap(e) } From 3ace10dc91e72ebe5013d5106eb0968a77c99d8d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 7 Jun 2014 16:16:37 -0700 Subject: [PATCH 02/26] HOTFIX: Support empty body in merge script Discovered in #992 Author: Patrick Wendell Closes #1007 from pwendell/hotfix and squashes the following commits: af90aa0 [Patrick Wendell] HOTFIX: Support empty body in merge script --- dev/merge_spark_pr.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index e3ac32ef1a12e..ffb70096d6014 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -128,8 +128,9 @@ def merge_pr(pr_num, target_ref): merge_message_flags = [] - for p in [title, body]: - merge_message_flags += ["-m", p] + merge_message_flags += ["-m", title] + if body != None: + merge_message_flags += ["-m", body] authors = "\n".join(["Author: %s" % a for a in distinct_authors]) From 7b877b27053bfb7092e250e01a3b887e1b50a109 Mon Sep 17 00:00:00 2001 From: Neville Li Date: Sat, 7 Jun 2014 16:22:26 -0700 Subject: [PATCH 03/26] SPARK-2056 Set RDD name to input path Author: Neville Li Closes #992 from nevillelyh/master and squashes the following commits: 3011739 [Neville Li] [SPARK-2056] Set RDD name to input path --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d941aea9d7eb2..d721aba709600 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString) + minPartitions).map(pair => pair._2.toString).setName(path) } /** @@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging { classOf[String], classOf[String], updateConf, - minPartitions) + minPartitions).setName(path) } /** @@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass, keyClass, valueClass, - minPartitions) + minPartitions).setName(path) } /** @@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } /** From a338834f90556d78119b37985b603ecee85f97ed Mon Sep 17 00:00:00 2001 From: Bernardo Gomez Palacio Date: Sun, 8 Jun 2014 01:24:52 -0700 Subject: [PATCH 04/26] SPARK-2026: Maven Hadoop Profiles Should Set The Hadoop Version The Maven Profiles that refer to hadoopX, e.g. `hadoop2.4`, should set the expected `hadoop.version` and `yarn.version`. e.g. ``` hadoop-2.4 2.4.0 ${hadoop.version} 2.5.0 0.9.0 ``` Builds can still define the `-Dhadoop.version` option but this will correctly default the Hadoop Version to the one that is expected according the profile that is selected. e.g. ```$ mvn -P hadoop-2.4,yarn clean install``` or ```$ mvn -P hadoop-0.23,yarn clean install``` [ticket] : https://issues.apache.org/jira/browse/SPARK-2026 Author : berngp Reviewer : ? Author: Bernardo Gomez Palacio Closes #998 from berngp/feature/SPARK-2026 and squashes the following commits: 07ba4f7 [Bernardo Gomez Palacio] SPARK-2026: Maven Hadoop Profiles Should Set The Hadoop Version --- pom.xml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 891468b21bfff..0d46bb4114f73 100644 --- a/pom.xml +++ b/pom.xml @@ -209,14 +209,14 @@ spring-releases Spring Release Repository - http://repo.spring.io/libs-release + http://repo.spring.io/libs-release true false - + @@ -987,11 +987,15 @@ avro + + 0.23.10 + hadoop-2.2 + 2.2.0 2.5.0 @@ -999,6 +1003,7 @@ hadoop-2.3 + 2.3.0 2.5.0 0.9.0 @@ -1007,6 +1012,7 @@ hadoop-2.4 + 2.4.0 2.5.0 0.9.0 From ee96e9406613e621837360b15c34ea7c7220a7a3 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Sun, 8 Jun 2014 12:27:34 -0700 Subject: [PATCH 05/26] SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl https://issues.apache.org/jira/browse/SPARK-1898 Author: Colin Patrick McCabe Closes #850 from cmccabe/master and squashes the following commits: d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl --- .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index e01ed5a57d697..039cf4f276119 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend( override def stop() { super.stop() - client.stop() + client.stop logInfo("Stopped") } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1b6bfb42a5c1c..393edd1f2d670 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl +import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} @@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf} * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. */ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf) - extends YarnClientImpl with ClientBase with Logging { + extends ClientBase with Logging { + + val yarnClient = YarnClient.createYarnClient def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, new Configuration(), spConf) @@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def runApp(): ApplicationId = { validateArgs() // Initialize and start the client service. - init(yarnConf) - start() + yarnClient.init(yarnConf) + yarnClient.start() // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers). logClusterResourceDetails() @@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa // interface). // Get a new client application. - val newApp = super.createApplication() + val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() @@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa } def logClusterResourceDetails() { - val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics + val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " + clusterMetrics.getNumNodeManagers) - val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) + val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue) logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( queueInfo.getQueueName, @@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa def submitApp(appContext: ApplicationSubmissionContext) = { // Submit the application to the applications manager. logInfo("Submitting application to ASM") - super.submitApplication(appContext) + yarnClient.submitApplication(appContext) } + def getApplicationReport(appId: ApplicationId) = + yarnClient.getApplicationReport(appId) + + def stop = yarnClient.stop + def monitorApplication(appId: ApplicationId): Boolean = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) while (true) { Thread.sleep(interval) - val report = super.getApplicationReport(appId) + val report = yarnClient.getApplicationReport(appId) logInfo("Application report from ASM: \n" + "\t application identifier: " + appId.toString() + "\n" + From a71c6d1cf0bbc027f633a65042191cf2201330d6 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 14:18:52 -0700 Subject: [PATCH 06/26] SPARK-1628: Add missing hashCode methods in Partitioner subclasses JIRA: https://issues.apache.org/jira/browse/SPARK-1628 Added `hashCode` in HashPartitioner, RangePartitioner, PythonPartitioner and PageRankUtils.CustomPartitioner. Author: zsxwing Closes #549 from zsxwing/SPARK-1628 and squashes the following commits: 2620936 [zsxwing] SPARK-1628: Add missing hashCode methods in Partitioner subclasses --- .../scala/org/apache/spark/Partitioner.scala | 17 ++++++++++++++++- .../spark/api/python/PythonPartitioner.scala | 2 ++ .../spark/examples/bagel/PageRankUtils.scala | 2 ++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9155159cf6aeb..01e918fabec67 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -83,6 +83,8 @@ class HashPartitioner(partitions: Int) extends Partitioner { case _ => false } + + override def hashCode: Int = numPartitions } /** @@ -119,7 +121,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } - def numPartitions = partitions + def numPartitions = rangeBounds.length + 1 private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] @@ -155,4 +157,17 @@ class RangePartitioner[K : Ordering : ClassTag, V]( case _ => false } + + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + var i = 0 + while (i < rangeBounds.length) { + result = prime * result + rangeBounds(i).hashCode + i += 1 + } + result = prime * result + ascending.hashCode + result + } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala index 95bec5030bfdd..e230d222b8604 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala @@ -50,4 +50,6 @@ private[spark] class PythonPartitioner( case _ => false } + + override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index b97cb8fb02823..e06f4dcd54442 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -124,4 +124,6 @@ class CustomPartitioner(partitions: Int) extends Partitioner { c.numPartitions == numPartitions case _ => false } + + override def hashCode: Int = numPartitions } From e9261d0866a610eab29fa332726186b534d1018f Mon Sep 17 00:00:00 2001 From: maji2014 Date: Sun, 8 Jun 2014 15:14:27 -0700 Subject: [PATCH 07/26] Update run-example Old code can only be ran under spark_home and use "bin/run-example". Error "./run-example: line 55: ./bin/spark-submit: No such file or directory" appears when running in other place. So change this Author: maji2014 Closes #1011 from maji2014/master and squashes the following commits: 2cc1af6 [maji2014] Update run-example Closes #988. --- bin/run-example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/run-example b/bin/run-example index 7caab31daef39..e7a5fe3914fbd 100755 --- a/bin/run-example +++ b/bin/run-example @@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS" fi -./bin/spark-submit \ +"$FWDIR"/bin/spark-submit \ --master $EXAMPLE_MASTER \ --class $EXAMPLE_CLASS \ "$SPARK_EXAMPLES_JAR" \ From 219dc00b30c8d9c4c0a6ce5d566497a93f21cb57 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 8 Jun 2014 18:39:57 -0700 Subject: [PATCH 08/26] SPARK-1628 follow up: Improve RangePartitioner's documentation. Adding a paragraph clarifying a weird behavior in RangePartitioner. See also #549. Author: Reynold Xin Closes #1012 from rxin/partitioner-doc and squashes the following commits: 6f0109e [Reynold Xin] SPARK-1628 follow up: Improve RangePartitioner's documentation. --- core/src/main/scala/org/apache/spark/Partitioner.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 01e918fabec67..e7f75481939a8 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -90,6 +90,10 @@ class HashPartitioner(partitions: Int) extends Partitioner { /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. + * + * Note that the actual number of partitions created by the RangePartitioner might not be the same + * as the `partitions` parameter, in the case where the number of sampled records is less than + * the value of `partitions`. */ class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, @@ -158,7 +162,6 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } - override def hashCode(): Int = { val prime = 31 var result = 1 From 15ddbef414d5fd6d4672936ba3c747b5fb7ab52b Mon Sep 17 00:00:00 2001 From: Neville Li Date: Sun, 8 Jun 2014 23:18:27 -0700 Subject: [PATCH 09/26] [SPARK-2067] use relative path for Spark logo in UI Author: Neville Li Closes #1006 from nevillelyh/gh/SPARK-2067 and squashes the following commits: 9ee64cf [Neville Li] [SPARK-2067] use relative path for Spark logo in UI --- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a43314f48112f..1b104253d545d 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -168,7 +168,7 @@ private[spark] object UIUtils extends Logging { diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py new file mode 100644 index 0000000000000..39fa6b0d22ef5 --- /dev/null +++ b/examples/src/main/python/cassandra_inputformat.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create data in Cassandra fist +(following: https://wiki.apache.org/cassandra/GettingStarted) + +cqlsh> CREATE KEYSPACE test + ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +cqlsh> use test; +cqlsh:test> CREATE TABLE users ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1745, 'john', 'smith'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1744, 'john', 'doe'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1746, 'john', 'smith'); +cqlsh:test> SELECT * FROM users; + + user_id | fname | lname +---------+-------+------- + 1745 | john | smith + 1744 | john | doe + 1746 | john | smith +""" +if __name__ == "__main__": + if len(sys.argv) != 4: + print >> sys.stderr, """ + Usage: cassandra_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + Assumes you have some data in Cassandra already, running on , in and + """ + exit(-1) + + host = sys.argv[1] + keyspace = sys.argv[2] + cf = sys.argv[3] + sc = SparkContext(appName="CassandraInputFormat") + + conf = {"cassandra.input.thrift.address":host, + "cassandra.input.thrift.port":"9160", + "cassandra.input.keyspace":keyspace, + "cassandra.input.columnfamily":cf, + "cassandra.input.partitioner.class":"Murmur3Partitioner", + "cassandra.input.page.row.size":"3"} + cass_rdd = sc.newAPIHadoopRDD( + "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", + "java.util.Map", + "java.util.Map", + keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter", + conf=conf) + output = cass_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py new file mode 100644 index 0000000000000..3289d9880a0f5 --- /dev/null +++ b/examples/src/main/python/hbase_inputformat.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create test data in HBase first: + +hbase(main):016:0> create 'test', 'f1' +0 row(s) in 1.0430 seconds + +hbase(main):017:0> put 'test', 'row1', 'f1', 'value1' +0 row(s) in 0.0130 seconds + +hbase(main):018:0> put 'test', 'row2', 'f1', 'value2' +0 row(s) in 0.0030 seconds + +hbase(main):019:0> put 'test', 'row3', 'f1', 'value3' +0 row(s) in 0.0050 seconds + +hbase(main):020:0> put 'test', 'row4', 'f1', 'value4' +0 row(s) in 0.0110 seconds + +hbase(main):021:0> scan 'test' +ROW COLUMN+CELL + row1 column=f1:, timestamp=1401883411986, value=value1 + row2 column=f1:, timestamp=1401883415212, value=value2 + row3 column=f1:, timestamp=1401883417858, value=value3 + row4 column=f1:, timestamp=1401883420805, value=value4 +4 row(s) in 0.0240 seconds +""" +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, """ + Usage: hbase_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ Assumes you have some data in HBase already, running on , in
+ """ + exit(-1) + + host = sys.argv[1] + table = sys.argv[2] + sc = SparkContext(appName="HBaseInputFormat") + + conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + hbase_rdd = sc.newAPIHadoopRDD( + "org.apache.hadoop.hbase.mapreduce.TableInputFormat", + "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "org.apache.hadoop.hbase.client.Result", + valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter", + conf=conf) + output = hbase_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f985f0..71f53af68f4d3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ + /* Need to create following keyspace and column family in cassandra before running this example Start CQL shell using ./bin/cqlsh and execute following commands diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e6e2..4893b017ed819 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD + object HBaseTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala new file mode 100644 index 0000000000000..29a65c7a5f295 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import java.nio.ByteBuffer +import org.apache.cassandra.utils.ByteBufferUtil +import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} + + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, Int] + */ +class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { + override def convert(obj: Any): java.util.Map[String, Int] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, String] + */ +class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { + override def convert(obj: Any): java.util.Map[String, String] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala new file mode 100644 index 0000000000000..42ae960bd64a1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result + * to a String + */ +class HBaseConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 211918f5a05ec..062bec2381a8f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -342,6 +342,143 @@ def wholeTextFiles(self, path, minPartitions=None): return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + def _dictToJavaMap(self, d): + jm = self._jvm.java.util.HashMap() + if not d: + d = {} + for k, v in d.iteritems(): + jm[k] = v + return jm + + def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, + valueConverter=None, minSplits=None): + """ + Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key + and value Writable classes + 2. Serialization is attempted via Pyrolite pickling + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + + @param path: path to sequncefile + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: + @param valueConverter: + @param minSplits: minimum splits in dataset + (default min(2, sc.defaultParallelism)) + """ + minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, + keyConverter, valueConverter, minSplits) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java. + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, + keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1f2a6ea941cf2..184ee810b861b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -198,6 +198,151 @@ def func(x): self.sc.parallelize([1]).foreach(func) +class TestInputFormat(PySparkTestCase): + + def setUp(self): + PySparkTestCase.setUp(self) + self.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(self.tempdir.name) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) + + def tearDown(self): + PySparkTestCase.tearDown(self) + shutil.rmtree(self.tempdir.name) + + def test_sequencefiles(self): + basepath = self.tempdir.name + ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", + "org.apache.hadoop.io.DoubleWritable", + "org.apache.hadoop.io.Text").collect()) + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + self.assertEqual(doubles, ed) + + text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/", + "org.apache.hadoop.io.Text", + "org.apache.hadoop.io.Text").collect()) + et = [(u'1', u'aa'), + (u'1', u'aa'), + (u'2', u'aa'), + (u'2', u'bb'), + (u'2', u'bb'), + (u'3', u'cc')] + self.assertEqual(text, et) + + bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + self.assertEqual(bools, eb) + + nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] + self.assertEqual(nulls, en) + + maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect()) + em = [(1, {2.0: u'aa'}), + (1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (2, {1.0: u'cc'}), + (2, {3.0: u'bb'}), + (3, {2.0: u'dd'})] + self.assertEqual(maps, em) + + clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable").collect()) + ec = (u'1', + {u'__class__': u'org.apache.spark.api.python.TestWritable', + u'double': 54.0, u'int': 123, u'str': u'test1'}) + self.assertEqual(clazz[0], ec) + + def test_oldhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.hadoopFile(hellopath, + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.newAPIHadoopFile(hellopath, + "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newolderror(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_bad_inputs(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.sequenceFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.NotValidWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_converter(self): + basepath = self.tempdir.name + maps = sorted(self.sc.sequenceFile( + basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable", + valueConverter="org.apache.spark.api.python.TestConverter").collect()) + em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])] + self.assertEqual(maps, em) + + class TestDaemon(unittest.TestCase): def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM From 08ed9ad81397b71206c4dc903bfb94b6105691ed Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 10 Jun 2014 00:49:09 -0700 Subject: [PATCH 20/26] [SPARK-1508][SQL] Add SQLConf to SQLContext. This PR (1) introduces a new class SQLConf that stores key-value properties for a SQLContext (2) clean up the semantics of various forms of SET commands. The SQLConf class unlocks user-controllable optimization opportunities; for example, user can now override the number of partitions used during an Exchange. A SQLConf can be accessed and modified programmatically through its getters and setters. It can also be modified through SET commands executed by `sql()` or `hql()`. Note that users now have the ability to change a particular property for different queries inside the same Spark job, unlike settings configured in SparkConf. For SET commands: "SET" will return all properties currently set in a SQLConf, "SET key" will return the key-value pair (if set) or an undefined message, and "SET key=value" will call the setter on SQLConf, and if a HiveContext is used, it will be executed in Hive as well. Author: Zongheng Yang Closes #956 from concretevitamin/sqlconf and squashes the following commits: 4968c11 [Zongheng Yang] Very minor cleanup. d74dde5 [Zongheng Yang] Remove the redundant mkQueryExecution() method. c129b86 [Zongheng Yang] Merge remote-tracking branch 'upstream/master' into sqlconf 26c40eb [Zongheng Yang] Make SQLConf a trait and have SQLContext mix it in. dd19666 [Zongheng Yang] Update a comment. baa5d29 [Zongheng Yang] Remove default param for shuffle partitions accessor. 5f7e6d8 [Zongheng Yang] Add default num partitions. 22d9ed7 [Zongheng Yang] Fix output() of Set physical. Add SQLConf param accessor method. e9856c4 [Zongheng Yang] Use java.util.Collections.synchronizedMap on a Java HashMap. 88dd0c8 [Zongheng Yang] Remove redundant SET Keyword. 271f0b1 [Zongheng Yang] Minor change. f8983d1 [Zongheng Yang] Minor changes per review comments. 1ce8a5e [Zongheng Yang] Invoke runSqlHive() in SQLConf#get for the HiveContext case. b766af9 [Zongheng Yang] Remove a test. d52e1bd [Zongheng Yang] De-hardcode number of shuffle partitions for BasicOperators (read from SQLConf). 555599c [Zongheng Yang] Bullet-proof (relatively) parsing SET per review comment. c2067e8 [Zongheng Yang] Mark SQLContext transient and put it in a second param list. 2ea8cdc [Zongheng Yang] Wrap long line. 41d7f09 [Zongheng Yang] Fix imports. 13279e6 [Zongheng Yang] Refactor the logic of eagerly processing SET commands. b14b83e [Zongheng Yang] In a HiveContext, make SQLConf a subset of HiveConf. 6983180 [Zongheng Yang] Move a SET test to SQLQuerySuite and make it complete. 5b67985 [Zongheng Yang] New line at EOF. c651797 [Zongheng Yang] Add commands.scala. efd82db [Zongheng Yang] Clean up semantics of several cases of SET. c1017c2 [Zongheng Yang] WIP in changing SetCommand to take two Options (for different semantics of SETs). 0f00d86 [Zongheng Yang] Add a test for singleton set command in SQL. 41acd75 [Zongheng Yang] Add a test for hql() in HiveQuerySuite. 2276929 [Zongheng Yang] Fix default hive result for set commands in HiveComparisonTest. 3b0c71b [Zongheng Yang] Remove Parser for set commands. A few other fixes. d0c4578 [Zongheng Yang] Tmux typo. 0ecea46 [Zongheng Yang] Changes for HiveQl and HiveContext. ce22d80 [Zongheng Yang] Fix parsing issues. cb722c1 [Zongheng Yang] Finish up SQLConf patch. 4ebf362 [Zongheng Yang] First cut at SQLConf inside SQLContext. --- .../apache/spark/sql/catalyst/SqlParser.scala | 31 ++++++-- .../catalyst/plans/logical/LogicalPlan.scala | 12 ++- .../scala/org/apache/spark/sql/SQLConf.scala | 78 +++++++++++++++++++ .../org/apache/spark/sql/SQLContext.scala | 39 ++++++++-- .../apache/spark/sql/execution/Exchange.scala | 6 +- .../spark/sql/execution/SparkStrategies.scala | 11 +-- .../apache/spark/sql/execution/commands.scala | 35 +++++++++ .../org/apache/spark/sql/SQLConfSuite.scala | 71 +++++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 39 +++++++++- .../spark/sql/execution/PlannerSuite.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 73 +++++++++-------- .../org/apache/spark/sql/hive/HiveQl.scala | 13 +++- .../hive/execution/HiveComparisonTest.scala | 3 + .../sql/hive/execution/HiveQuerySuite.scala | 75 ++++++++++++++++++ 14 files changed, 429 insertions(+), 61 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index cc650128c2c3f..36758f3114e59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -41,10 +41,25 @@ import org.apache.spark.sql.catalyst.types._ * for a SQL like language should checkout the HiveQL support in the sql/hive sub-project. */ class SqlParser extends StandardTokenParsers with PackratParsers { + def apply(input: String): LogicalPlan = { - phrase(query)(new lexical.Scanner(input)) match { - case Success(r, x) => r - case x => sys.error(x.toString) + // Special-case out set commands since the value fields can be + // complex to handle without RegexParsers. Also this approach + // is clearer for the several possible cases of set commands. + if (input.trim.toLowerCase.startsWith("set")) { + input.trim.drop(3).split("=", 2).map(_.trim) match { + case Array("") => // "set" + SetCommand(None, None) + case Array(key) => // "set key" + SetCommand(Some(key), None) + case Array(key, value) => // "set key=value" + SetCommand(Some(key), Some(value)) + } + } else { + phrase(query)(new lexical.Scanner(input)) match { + case Success(r, x) => r + case x => sys.error(x.toString) + } } } @@ -169,11 +184,13 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } } - protected lazy val query: Parser[LogicalPlan] = + protected lazy val query: Parser[LogicalPlan] = ( select * ( - UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } - ) | insert + UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } + ) + | insert + ) protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4f641cd3a656b..7eeb98aea6368 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -102,7 +102,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { */ abstract class Command extends LeafNode { self: Product => - def output: Seq[Attribute] = Seq.empty + def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this } /** @@ -111,6 +111,16 @@ abstract class Command extends LeafNode { */ case class NativeCommand(cmd: String) extends Command +/** + * Commands of the form "SET (key) (= value)". + */ +case class SetCommand(key: Option[String], value: Option[String]) extends Command { + override def output = Seq( + AttributeReference("key", StringType, nullable = false)(), + AttributeReference("value", StringType, nullable = false)() + ) +} + /** * Returned by a parser when the users only wants to see what query plan would be executed, without * actually performing the execution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala new file mode 100644 index 0000000000000..b378252ba2f55 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.Properties + +import scala.collection.JavaConverters._ + +/** + * SQLConf holds mutable config parameters and hints. These can be set and + * queried either by passing SET commands into Spark SQL's DSL + * functions (sql(), hql(), etc.), or by programmatically using setters and + * getters of this class. This class is thread-safe. + */ +trait SQLConf { + + /** Number of partitions to use for shuffle operators. */ + private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt + + @transient + private val settings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) + + def set(props: Properties): Unit = { + props.asScala.foreach { case (k, v) => this.settings.put(k, v) } + } + + def set(key: String, value: String): Unit = { + require(key != null, "key cannot be null") + require(value != null, s"value cannot be null for ${key}") + settings.put(key, value) + } + + def get(key: String): String = { + if (!settings.containsKey(key)) { + throw new NoSuchElementException(key) + } + settings.get(key) + } + + def get(key: String, defaultValue: String): String = { + if (!settings.containsKey(key)) defaultValue else settings.get(key) + } + + def getAll: Array[(String, String)] = settings.asScala.toArray + + def getOption(key: String): Option[String] = { + if (!settings.containsKey(key)) None else Some(settings.get(key)) + } + + def contains(key: String): Boolean = settings.containsKey(key) + + def toDebugString: String = { + settings.synchronized { + settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") + } + } + + private[spark] def clear() { + settings.clear() + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index fde4c485b58a0..021e0e8245a0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.columnar.InMemoryColumnarTableScan @@ -52,6 +52,7 @@ import org.apache.spark.sql.parquet.ParquetRelation @AlphaComponent class SQLContext(@transient val sparkContext: SparkContext) extends Logging + with SQLConf with dsl.ExpressionConversions with Serializable { @@ -190,6 +191,8 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext + def numPartitions = self.numShufflePartitions + val strategies: Seq[Strategy] = CommandStrategy(self) :: TakeOrdered :: @@ -246,6 +249,10 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val planner = new SparkPlanner + @transient + protected[sql] lazy val emptyResult = + sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) + /** * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and * inserting shuffle operations as needed. @@ -253,15 +260,10 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = - Batch("Add exchange", Once, AddExchange) :: + Batch("Add exchange", Once, AddExchange(self)) :: Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil } - // TODO: or should we make QueryExecution protected[sql]? - protected[sql] def mkQueryExecution(plan: LogicalPlan) = new QueryExecution { - val logical = plan - } - /** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. @@ -269,6 +271,22 @@ class SQLContext(@transient val sparkContext: SparkContext) protected abstract class QueryExecution { def logical: LogicalPlan + def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match { + case SetCommand(key, value) => + // Only this case needs to be executed eagerly. The other cases will + // be taken care of when the actual results are being extracted. + // In the case of HiveContext, sqlConf is overridden to also pass the + // pair into its HiveConf. + if (key.isDefined && value.isDefined) { + set(key.get, value.get) + } + // It doesn't matter what we return here, since this is only used + // to force the evaluation to happen eagerly. To query the results, + // one must use SchemaRDD operations to extract them. + emptyResult + case _ => executedPlan.execute() + } + lazy val analyzed = analyzer(logical) lazy val optimizedPlan = optimizer(analyzed) // TODO: Don't just pick the first one... @@ -276,7 +294,12 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() + lazy val toRdd: RDD[Row] = { + logical match { + case s: SetCommand => eagerlyProcess(s) + case _ => executedPlan.execute() + } + } protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 3b4acb72e87b5..cef294167f146 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{SQLConf, SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering} import org.apache.spark.sql.catalyst.plans.physical._ @@ -86,9 +86,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una * [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting * [[Exchange]] Operators where required. */ -private[sql] object AddExchange extends Rule[SparkPlan] { +private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - val numPartitions = 150 + def numPartitions = sqlContext.numShufflePartitions def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 295c265b1673f..0455748d40eec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{SQLContext, execution} +import org.apache.spark.sql.{SQLConf, SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ @@ -193,8 +193,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // Can we automate these 'pass through' operations? object BasicOperators extends Strategy { - // TODO: Set - val numPartitions = 200 + def numPartitions = self.numPartitions + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Distinct(child) => execution.Aggregate( @@ -234,11 +234,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - // TODO: this should be merged with SPARK-1508's SetCommandStrategy case class CommandStrategy(context: SQLContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.SetCommand(key, value) => + Seq(execution.SetCommandPhysical(key, value, plan.output)(context)) case logical.ExplainCommand(child) => - val qe = context.mkQueryExecution(child) + val qe = context.executePlan(child) Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context)) case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 5371d2f479e73..9364506691f38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -17,10 +17,45 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute} +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute]) + (@transient context: SQLContext) extends LeafNode { + def execute(): RDD[Row] = (key, value) match { + // Set value for key k; the action itself would + // have been performed in QueryExecution eagerly. + case (Some(k), Some(v)) => context.emptyResult + // Query the value bound to key k. + case (Some(k), None) => + val resultString = context.getOption(k) match { + case Some(v) => s"$k=$v" + case None => s"$k is undefined" + } + context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1) + // Query all key-value pairs that are set in the SQLConf of the context. + case (None, None) => + val pairs = context.getAll + val rows = pairs.map { case (k, v) => + new GenericRow(Array[Any](s"$k=$v")) + }.toSeq + // Assume config parameters can fit into one split (machine) ;) + context.sparkContext.parallelize(rows, 1) + // The only other case is invalid semantics and is impossible. + case _ => context.emptyResult + } +} + +/** + * :: DeveloperApi :: + */ +@DeveloperApi case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) (@transient context: SQLContext) extends UnaryNode { def execute(): RDD[Row] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala new file mode 100644 index 0000000000000..5eb73a4eff980 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -0,0 +1,71 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark.sql.test._ + +/* Implicits */ +import TestSQLContext._ + +class SQLConfSuite extends QueryTest { + + val testKey = "test.key.0" + val testVal = "test.val.0" + + test("programmatic ways of basic setting and getting") { + assert(getOption(testKey).isEmpty) + assert(getAll.toSet === Set()) + + set(testKey, testVal) + assert(get(testKey) == testVal) + assert(get(testKey, testVal + "_") == testVal) + assert(getOption(testKey) == Some(testVal)) + assert(contains(testKey)) + + // Tests SQLConf as accessed from a SQLContext is mutable after + // the latter is initialized, unlike SparkConf inside a SparkContext. + assert(TestSQLContext.get(testKey) == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.getOption(testKey) == Some(testVal)) + assert(TestSQLContext.contains(testKey)) + + clear() + } + + test("parse SQL set commands") { + sql(s"set $testKey=$testVal") + assert(get(testKey, testVal + "_") == testVal) + assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + + sql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + sql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") + + val key = "spark.sql.key" + val vs = "val0,val_1,val2.3,my_table" + sql(s"set $key=$vs") + assert(get(key, "0") == vs) + + sql(s"set $key=") + assert(get(key, "0") == "") + + clear() + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d651b967a6c16..f2d850ad6aa56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -361,6 +361,41 @@ class SQLQuerySuite extends QueryTest { (1, "abc"), (2, "abc"), (3, null))) - } - + } + + test("SET commands semantics using sql()") { + clear() + val testKey = "test.key.0" + val testVal = "test.val.0" + val nonexistentKey = "nonexistent" + + // "set" itself returns all config variables currently specified in SQLConf. + assert(sql("SET").collect().size == 0) + + // "set key=val" + sql(s"SET $testKey=$testVal") + checkAnswer( + sql("SET"), + Seq(Seq(s"$testKey=$testVal")) + ) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + checkAnswer( + sql("set"), + Seq( + Seq(s"$testKey=$testVal"), + Seq(s"${testKey + testKey}=${testVal + testVal}")) + ) + + // "set key" + checkAnswer( + sql(s"SET $testKey"), + Seq(Seq(s"$testKey=$testVal")) + ) + checkAnswer( + sql(s"SET $nonexistentKey"), + Seq(Seq(s"$nonexistentKey is undefined")) + ) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index c563d63627544..df6b118360d01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -30,8 +30,8 @@ class PlannerSuite extends FunSuite { test("unions are collapsed") { val query = testData.unionAll(testData).unionAll(testData).logicalPlan val planned = BasicOperators(query).head - val logicalUnions = query collect { case u: logical.Union => u} - val physicalUnions = planned collect { case u: execution.Union => u} + val logicalUnions = query collect { case u: logical.Union => u } + val physicalUnions = planned collect { case u: execution.Union => u } assert(logicalUnions.size === 2) assert(physicalUnions.size === 1) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4b97dc25acf89..64978215542ec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql package hive -import scala.language.implicitConversions - import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.util.{ArrayList => JArrayList} +import scala.collection.JavaConversions._ +import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.hive.conf.HiveConf @@ -30,20 +30,15 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} -import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} -import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** * Starts up an instance of hive where metadata is stored locally. An in-process metadata data is * created with data stored in ./metadata. Warehouse data is stored in in ./warehouse. @@ -55,10 +50,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Sets up the system initially or after a RESET command */ protected def configure() { - // TODO: refactor this so we can work with other databases. - runSqlHive( - s"set javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastorePath;create=true") - runSqlHive("set hive.metastore.warehouse.dir=" + warehousePath) + set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastorePath;create=true") + set("hive.metastore.warehouse.dir", warehousePath) } configure() // Must be called before initializing the catalog below. @@ -129,12 +123,27 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } + /** + * SQLConf and HiveConf contracts: when the hive session is first initialized, params in + * HiveConf will get picked up by the SQLConf. Additionally, any properties set by + * set() or a SET command inside hql() or sql() will be set in the SQLConf *as well as* + * in the HiveConf. + */ @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = new SessionState(hiveconf) + @transient protected[hive] lazy val sessionState = { + val ss = new SessionState(hiveconf) + set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. + ss + } sessionState.err = new PrintStream(outputBuffer, true, "UTF-8") sessionState.out = new PrintStream(outputBuffer, true, "UTF-8") + override def set(key: String, value: String): Unit = { + super.set(key, value) + runSqlHive(s"SET $key=$value") + } + /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { @@ -236,30 +245,31 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] val planner = hivePlanner - @transient - protected lazy val emptyResult = - sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) - /** Extends QueryExecution with hive specific features. */ protected[sql] abstract class QueryExecution extends super.QueryExecution { // TODO: Create mixin for the analyzer instead of overriding things here. override lazy val optimizedPlan = optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) - override lazy val toRdd: RDD[Row] = - analyzed match { - case NativeCommand(cmd) => - val output = runSqlHive(cmd) + override lazy val toRdd: RDD[Row] = { + def processCmd(cmd: String): RDD[Row] = { + val output = runSqlHive(cmd) + if (output.size == 0) { + emptyResult + } else { + val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]])) + sparkContext.parallelize(asRows, 1) + } + } - if (output.size == 0) { - emptyResult - } else { - val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]])) - sparkContext.parallelize(asRows, 1) - } - case _ => - executedPlan.execute().map(_.copy()) + logical match { + case s: SetCommand => eagerlyProcess(s) + case _ => analyzed match { + case NativeCommand(cmd) => processCmd(cmd) + case _ => executedPlan.execute().map(_.copy()) + } } + } protected val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, @@ -305,7 +315,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ def stringResult(): Seq[String] = analyzed match { case NativeCommand(cmd) => runSqlHive(cmd) - case ExplainCommand(plan) => mkQueryExecution(plan).toString.split("\n") + case ExplainCommand(plan) => executePlan(plan).toString.split("\n") case query => val result: Seq[Seq[Any]] = toRdd.collect().toSeq // We need the types so we can output struct field names @@ -318,6 +328,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def simpleString: String = logical match { case _: NativeCommand => "" + case _: SetCommand => "" case _ => executedPlan.toString } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index cc9e24a05740b..4e74d9bc909fa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -207,8 +207,17 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { try { - if (sql.toLowerCase.startsWith("set")) { - NativeCommand(sql) + if (sql.trim.toLowerCase.startsWith("set")) { + // Split in two parts since we treat the part before the first "=" + // as key, and the part after as value, which may contain other "=" signs. + sql.trim.drop(3).split("=", 2).map(_.trim) match { + case Array("") => // "set" + SetCommand(None, None) + case Array(key) => // "set key" + SetCommand(Some(key), None) + case Array(key, value) => // "set key=value" + SetCommand(Some(key), Some(value)) + } } else if (sql.toLowerCase.startsWith("add jar")) { AddJar(sql.drop(8)) } else if (sql.toLowerCase.startsWith("add file")) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 0f954103a85f2..357c7e654bd20 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -138,6 +138,9 @@ abstract class HiveComparisonTest val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. + // Hack: Hive simply prints the result of a SET command to screen, + // and does not return it as a query answer. + case _: SetCommand => Seq("0") case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case plan => if (isSorted(plan)) answer else answer.sorted diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c56eee258047f..6c239b02ed09a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.Row import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive @@ -171,4 +172,78 @@ class HiveQuerySuite extends HiveComparisonTest { TestHive.reset() } + test("parse HQL set commands") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + val testVal = "val0,val_1,val2.3,my_table" + + hql(s"set $testKey=$testVal") + assert(get(testKey, testVal + "_") == testVal) + + hql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + hql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") + + hql(s"set $testKey=$testVal") + assert(get(testKey, "0") == testVal) + + hql(s"set $testKey=") + assert(get(testKey, "0") == "") + } + + test("SET commands semantics for a HiveContext") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + var testVal = "test.val.0" + val nonexistentKey = "nonexistent" + def fromRows(row: Array[Row]): Array[String] = row.map(_.getString(0)) + + clear() + + // "set" itself returns all config variables currently specified in SQLConf. + assert(hql("set").collect().size == 0) + + // "set key=val" + hql(s"SET $testKey=$testVal") + assert(fromRows(hql("SET").collect()) sameElements Array(s"$testKey=$testVal")) + assert(hiveconf.get(testKey, "") == testVal) + + hql(s"SET ${testKey + testKey}=${testVal + testVal}") + assert(fromRows(hql("SET").collect()) sameElements + Array( + s"$testKey=$testVal", + s"${testKey + testKey}=${testVal + testVal}")) + assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) + + // "set key" + assert(fromRows(hql(s"SET $testKey").collect()) sameElements + Array(s"$testKey=$testVal")) + assert(fromRows(hql(s"SET $nonexistentKey").collect()) sameElements + Array(s"$nonexistentKey is undefined")) + + // Assert that sql() should have the same effects as hql() by repeating the above using sql(). + clear() + assert(sql("set").collect().size == 0) + + sql(s"SET $testKey=$testVal") + assert(fromRows(sql("SET").collect()) sameElements Array(s"$testKey=$testVal")) + assert(hiveconf.get(testKey, "") == testVal) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + assert(fromRows(sql("SET").collect()) sameElements + Array( + s"$testKey=$testVal", + s"${testKey + testKey}=${testVal + testVal}")) + assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) + + assert(fromRows(sql(s"SET $testKey").collect()) sameElements + Array(s"$testKey=$testVal")) + assert(fromRows(sql(s"SET $nonexistentKey").collect()) sameElements + Array(s"$nonexistentKey is undefined")) + } + + // Put tests that depend on specific Hive settings before these last two test, + // since they modify /clear stuff. + } From a9a461c594fd20e46947e318095df60bddb67559 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 10 Jun 2014 01:14:44 -0700 Subject: [PATCH 21/26] Moved hiveOperators.scala to the right package folder The package is `org.apache.spark.sql.hive.execution`, while the file was placed under `sql/hive/src/main/scala/org/apache/spark/sql/hive/`. Author: Cheng Lian Closes #1029 from liancheng/moveHiveOperators and squashes the following commits: d632eb8 [Cheng Lian] Moved hiveOperators.scala to the right package folder --- .../org/apache/spark/sql/hive/{ => execution}/hiveOperators.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/{ => execution}/hiveOperators.scala (100%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala similarity index 100% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala From 884ca718b24f0bbe93358f2a366463b4e4d31f49 Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 10 Jun 2014 10:34:57 -0500 Subject: [PATCH 22/26] [SPARK-1978] In some cases, spark-yarn does not automatically restart the failed container Author: witgo Closes #921 from witgo/allocateExecutors and squashes the following commits: bc3aa66 [witgo] review commit 8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 32ac7af [witgo] review commit 056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 04c6f7e [witgo] Merge branch 'master' into allocateExecutors aff827c [witgo] review commit 5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors 1faf4f4 [witgo] Merge branch 'master' into allocateExecutors 3c464bd [witgo] add time limit to allocateExecutors e00b656 [witgo] In some cases, yarn does not automatically restart the container --- .../spark/deploy/yarn/ApplicationMaster.scala | 39 +++++++++++-------- .../spark/deploy/yarn/ExecutorLauncher.scala | 22 ++++++----- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index c1dfe3f53b40b..33a60d978c586 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, try { logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } + checkNumExecutorsFailed() + allocateMissingExecutor() yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) @@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } + } + + private def checkNumExecutorsFailed() { + if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of executor failures reached") + } + } + private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { - if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finishApplicationMaster(FinalApplicationStatus.FAILED, - "max number of executor failures reached") - } - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + checkNumExecutorsFailed() + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index a4ce8766d347c..d93e5bb0225d5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + args.numExecutors + " executors.") // Wait until all containers have finished - // TODO: This is a bit ugly. Can we make it nicer? - // TODO: Handle container failure - yarnAllocator.addResourceRequests(args.numExecutors) + yarnAllocator.allocateResources() while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { + allocateMissingExecutor() yarnAllocator.allocateResources() Thread.sleep(100) } logInfo("All executors have launched.") + } + private def allocateMissingExecutor() { + val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - + yarnAllocator.getNumPendingAllocate + if (missingExecutorCount > 0) { + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingExecutorCount)) + yarnAllocator.addResourceRequests(missingExecutorCount) + } } // TODO: We might want to extend this to allocate more containers in case they die ! @@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val t = new Thread { override def run() { while (!driverClosed) { - val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning - - yarnAllocator.getNumPendingAllocate - if (missingExecutorCount > 0) { - logInfo("Allocating %d containers to make up for (potentially) lost containers". - format(missingExecutorCount)) - yarnAllocator.addResourceRequests(missingExecutorCount) - } + allocateMissingExecutor() sendProgress() Thread.sleep(sleepTime) } From db0c038a66cb228bcb62a5607cd0ed013d0f9f20 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 10 Jun 2014 12:59:52 -0700 Subject: [PATCH 23/26] [SPARK-2076][SQL] Pushdown the join filter & predication for outer join As the rule described in https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior, we can optimize the SQL Join by pushing down the Join predicate and Where predicate. Author: Cheng Hao Closes #1015 from chenghao-intel/join_predicate_push_down and squashes the following commits: 10feff9 [Cheng Hao] fix bug of changing the join type in PredicatePushDownThroughJoin 44c6700 [Cheng Hao] Add logical to support pushdown the join filter 0bce426 [Cheng Hao] Pushdown the join filter & predicate for outer join --- .../sql/catalyst/optimizer/Optimizer.scala | 112 +++++++++-- .../optimizer/FilterPushdownSuite.scala | 187 +++++++++++++++++- 2 files changed, 277 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 406ffd6801e98..ccb8245cc2e7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,6 +19,10 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.FullOuter +import org.apache.spark.sql.catalyst.plans.LeftOuter +import org.apache.spark.sql.catalyst.plans.RightOuter +import org.apache.spark.sql.catalyst.plans.LeftSemi import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ @@ -34,7 +38,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, - PushPredicateThroughInnerJoin, + PushPredicateThroughJoin, ColumnPruning) :: Nil } @@ -254,28 +258,98 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { /** * Pushes down [[catalyst.plans.logical.Filter Filter]] operators where the `condition` can be - * evaluated using only the attributes of the left or right side of an inner join. Other + * evaluated using only the attributes of the left or right side of a join. Other * [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the * [[catalyst.plans.logical.Join Join]]. + * And also Pushes down the join filter, where the `condition` can be evaluated using only the + * attributes of the left or right side of sub query when applicable. + * + * Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details */ -object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper { +object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { + // split the condition expression into 3 parts, + // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) + private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { + val (leftEvaluateCondition, rest) = + condition.partition(_.references subsetOf left.outputSet) + val (rightEvaluateCondition, commonCondition) = + rest.partition(_.references subsetOf right.outputSet) + + (leftEvaluateCondition, rightEvaluateCondition, commonCondition) + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) => - val allConditions = - splitConjunctivePredicates(filterCondition) ++ - joinCondition.map(splitConjunctivePredicates).getOrElse(Nil) - - // Split the predicates into those that can be evaluated on the left, right, and those that - // must be evaluated after the join. - val (rightConditions, leftOrJoinConditions) = - allConditions.partition(_.references subsetOf right.outputSet) - val (leftConditions, joinConditions) = - leftOrJoinConditions.partition(_.references subsetOf left.outputSet) - - // Build the new left and right side, optionally with the pushed down filters. - val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And)) + // push the where condition down into join filter + case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => + val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = + split(splitConjunctivePredicates(filterCondition), left, right) + + joinType match { + case Inner => + // push down the single side `where` condition into respective sides + val newLeft = leftFilterConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightFilterConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And) + + Join(newLeft, newRight, Inner, newJoinCond) + case RightOuter => + // push down the right side only `where` condition + val newLeft = left + val newRight = rightFilterConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = joinCondition + val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond) + + (leftFilterConditions ++ commonFilterCondition). + reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) + case _ @ (LeftOuter | LeftSemi) => + // push down the left side only `where` condition + val newLeft = leftFilterConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = right + val newJoinCond = joinCondition + val newJoin = Join(newLeft, newRight, joinType, newJoinCond) + + (rightFilterConditions ++ commonFilterCondition). + reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) + case FullOuter => f // DO Nothing for Full Outer Join + } + + // push down the join filter into sub query scanning if applicable + case f @ Join(left, right, joinType, joinCondition) => + val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = + split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) + + joinType match { + case Inner => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = commonJoinCondition.reduceLeftOption(And) + + Join(newLeft, newRight, Inner, newJoinCond) + case RightOuter => + // push down the left side only join filter for left side sub query + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = right + val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + + Join(newLeft, newRight, RightOuter, newJoinCond) + case _ @ (LeftOuter | LeftSemi) => + // push down the right side only join filter for right sub query + val newLeft = left + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + + Join(newLeft, newRight, joinType, newJoinCond) + case FullOuter => f + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ef47850455a37..02cc665f8a8c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -20,11 +20,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.FullOuter +import org.apache.spark.sql.catalyst.plans.LeftOuter +import org.apache.spark.sql.catalyst.plans.RightOuter import org.apache.spark.sql.catalyst.rules._ - -/* Implicit conversions */ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.junit.Test class FilterPushdownSuite extends OptimizerTest { @@ -35,7 +38,7 @@ class FilterPushdownSuite extends OptimizerTest { Batch("Filter Pushdown", Once, CombineFilters, PushPredicateThroughProject, - PushPredicateThroughInnerJoin) :: Nil + PushPredicateThroughJoin) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -161,6 +164,184 @@ class FilterPushdownSuite extends OptimizerTest { comparePlans(optimized, correctAnswer) } + + test("joins: push down left outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 1) + val correctAnswer = + left.join(y, LeftOuter).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter).where("x.b".attr === 1).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("x.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('d) + val correctAnswer = + left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter, Some("l.a".attr===3)). + where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('a === 3).subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } test("joins: can't push down") { val x = testRelation.subquery('x) From fb499be1ac935b6f91046ec8ff23ac1267c82342 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 10 Jun 2014 13:13:17 -0700 Subject: [PATCH 24/26] HOTFIX: Fix Python tests on Jenkins. Author: Patrick Wendell Closes #1036 from pwendell/jenkins-test and squashes the following commits: 9c99856 [Patrick Wendell] Better output during tests 71e7b74 [Patrick Wendell] Removing incorrect python path 74984db [Patrick Wendell] HOTFIX: Allow PySpark tests to run on Jenkins. --- bin/pyspark | 6 +++++- dev/run-tests | 3 --- python/run-tests | 11 +++++++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index d0fa56f31913f..114cbbc3a8a8e 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -86,6 +86,10 @@ else if [[ "$IPYTHON" = "1" ]]; then exec ipython $IPYTHON_OPTS else - exec "$PYSPARK_PYTHON" + if [[ -n $SPARK_TESTING ]]; then + exec "$PYSPARK_PYTHON" -m doctest + else + exec "$PYSPARK_PYTHON" + fi fi fi diff --git a/dev/run-tests b/dev/run-tests index 93d6692f83ca8..c82a47ebb618b 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -73,9 +73,6 @@ fi echo "=========================================================================" echo "Running PySpark tests" echo "=========================================================================" -if [ -z "$PYSPARK_PYTHON" ]; then - export PYSPARK_PYTHON=/usr/local/bin/python2.7 -fi ./python/run-tests echo "=========================================================================" diff --git a/python/run-tests b/python/run-tests index 36a96121cbc0d..3b4501178c89f 100755 --- a/python/run-tests +++ b/python/run-tests @@ -32,7 +32,8 @@ rm -f unit-tests.log rm -rf metastore warehouse function run_test() { - SPARK_TESTING=0 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log + echo "Running test: $1" + SPARK_TESTING=1 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) # Fail and exit on the first test failure. @@ -46,15 +47,17 @@ function run_test() { } +echo "Running PySpark tests. Output is in python/unit-tests.log." + run_test "pyspark/rdd.py" run_test "pyspark/context.py" run_test "pyspark/conf.py" if [ -n "$_RUN_SQL_TESTS" ]; then run_test "pyspark/sql.py" fi -run_test "-m doctest pyspark/broadcast.py" -run_test "-m doctest pyspark/accumulators.py" -run_test "-m doctest pyspark/serializers.py" +run_test "pyspark/broadcast.py" +run_test "pyspark/accumulators.py" +run_test "pyspark/serializers.py" run_test "pyspark/tests.py" run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" From 55a0e87ee4655106d5e0ed799b11e77f68a17dbb Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 10 Jun 2014 13:15:06 -0700 Subject: [PATCH 25/26] HOTFIX: Increase time limit for Bagel test The test was timing out on some slow EC2 workers. Author: Ankur Dave Closes #1037 from ankurdave/bagel-test-time-limit and squashes the following commits: 67fd487 [Ankur Dave] Increase time limit for Bagel test --- bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index 110bd0a9a0c41..55241d33cd3f0 100644 --- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -80,7 +80,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo test("large number of iterations") { // This tests whether jobs with a large number of iterations finish in a reasonable time, // because non-memoized recursion in RDD or DAGScheduler used to cause them to hang - failAfter(10 seconds) { + failAfter(30 seconds) { sc = new SparkContext("local", "test") val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) val msgs = sc.parallelize(Array[(String, TestMessage)]()) @@ -101,7 +101,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo sc = new SparkContext("local", "test") val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0)))) val msgs = sc.parallelize(Array[(String, TestMessage)]()) - val numSupersteps = 50 + val numSupersteps = 20 val result = Bagel.run(sc, verts, msgs, sc.defaultParallelism, StorageLevel.DISK_ONLY) { (self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) => From 1abbde0e89131ad95e793ac1834c392db46b448e Mon Sep 17 00:00:00 2001 From: egraldlo Date: Tue, 10 Jun 2014 14:07:55 -0700 Subject: [PATCH 26/26] [SQL] Add average overflow test case from #978 By @egraldlo. Author: egraldlo Author: Michael Armbrust Closes #1033 from marmbrus/pr/978 and squashes the following commits: e228c5e [Michael Armbrust] Remove "test". 762aeaf [Michael Armbrust] Remove unneeded rule. More descriptive name for test table. d414cd7 [egraldlo] fommatting issues 1153f75 [egraldlo] do best to avoid overflowing in function avg(). --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++++ .../test/scala/org/apache/spark/sql/TestData.scala | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f2d850ad6aa56..de02bbc7e4700 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -136,6 +136,12 @@ class SQLQuerySuite extends QueryTest { 2.0) } + test("average overflow") { + checkAnswer( + sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"), + Seq((2147483645.0,1),(2.0,2))) + } + test("count") { checkAnswer( sql("SELECT COUNT(*) FROM testData2"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 05de736bbce1b..330b20b315d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -30,6 +30,17 @@ object TestData { (1 to 100).map(i => TestData(i, i.toString))) testData.registerAsTable("testData") + case class LargeAndSmallInts(a: Int, b: Int) + val largeAndSmallInts: SchemaRDD = + TestSQLContext.sparkContext.parallelize( + LargeAndSmallInts(2147483644, 1) :: + LargeAndSmallInts(1, 2) :: + LargeAndSmallInts(2147483645, 1) :: + LargeAndSmallInts(2, 2) :: + LargeAndSmallInts(2147483646, 1) :: + LargeAndSmallInts(3, 2) :: Nil) + largeAndSmallInts.registerAsTable("largeAndSmallInts") + case class TestData2(a: Int, b: Int) val testData2: SchemaRDD = TestSQLContext.sparkContext.parallelize(