Skip to content

Commit

Permalink
Change createTableAs to saveAsTable. Clean up api annotations.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 15, 2014
1 parent d07d94b commit 882afdf
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 32 deletions.
37 changes: 26 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD

import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -72,12 +72,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new this.QueryExecution { val logical = plan }

/**
* :: Experimental ::
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guranteed to be stable. As a result, using
* them directly is not reccomended.
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
* them directly is not recommended.
*/
@Experimental
@DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)

/**
Expand All @@ -98,14 +98,27 @@ class SQLContext(@transient val sparkContext: SparkContext)

/**
* :: Experimental ::
*
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
*
* @param path
* @param allowExisting
* @param conf
* @tparam A
* {{{
* val sqlContext = new SQLContext(...)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @tparam A A case class type that describes the desired schema of the parquet file to be
* created.
* @param path The path where the directory containing partquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specifiy options to the parquet
* output format.
*
* @group userf
*/
@Experimental
def createParquetFile[A <: Product : TypeTag](
Expand Down Expand Up @@ -235,9 +248,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*/
@DeveloperApi
protected abstract class QueryExecution {
def logical: LogicalPlan

Expand All @@ -258,7 +273,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
override def toString: String =
s"""== Logical Plan ==
|${stringOrError(analyzed)}
|== Optimized Logical Plan
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
Expand Down
11 changes: 4 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -79,8 +79,6 @@ import org.apache.spark.sql.catalyst.types.BooleanType
* rdd.where('key === 1).orderBy('value.asc).select('key).collect()
* }}}
*
* @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
*
* @groupname Query Language Integrated Queries
* @groupdesc Query Functions that create new queries from SchemaRDDs. The
* result of all query functions is also a SchemaRDD, allowing multiple operations to be
Expand Down Expand Up @@ -286,11 +284,10 @@ class SchemaRDD(
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))

/**
* Returns this RDD as a SchemaRDD.
* Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit
* conversion from an standard RDD to a SchemaRDD.
*
* @group schema
*/
def toSchemaRDD = this

/** FOR INTERNAL USE ONLY */
def analyze = sqlContext.analyzer(logicalPlan)
}
27 changes: 17 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._

Expand All @@ -31,14 +31,24 @@ trait SchemaRDDLike {
private[sql] def baseSchemaRDD: SchemaRDD

/**
* :: DeveloperApi ::
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
* through to the RDD that is produced by this workflow. This workflow is produced lazily because
* invoking the whole query optimization pipeline can be expensive.
*
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
* The query execution is considered a Developer API as phases may be added or removed in future
* releases. This execution is only exposed to provide an interface for inspecting the various
* phases for debugging purposes. Applications should not depend on particular phases existing
* or producing any specific output, even for exactly the same query.
*
* Additionally, the RDD exposed by this execution is not designed for consumption by end users.
* In particular, it does not contain any schema information, and it reuses Row objects
* internally. This object reuse improves performance, but can make programming against the RDD
* more difficult. Instead end users should perform RDD operations on a SchemaRDD directly.
*/
@transient
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)

override def toString =
s"""${super.toString}
Expand Down Expand Up @@ -68,7 +78,6 @@ trait SchemaRDDLike {

/**
* :: Experimental ::
*
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
*
* @group schema
Expand All @@ -80,7 +89,6 @@ trait SchemaRDDLike {

/**
* :: Experimental ::
*
* Appends the rows from this RDD to the specified table.
*
* @group schema
Expand All @@ -90,7 +98,6 @@ trait SchemaRDDLike {

/**
* :: Experimental ::
*
* Creates a table from the the contents of this SchemaRDD. This will fail if the table already
* exists.
*
Expand All @@ -99,9 +106,9 @@ trait SchemaRDDLike {
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* @param tableName
* @group schema
*/
@Experimental
def createTableAs(tableName: String): Unit =
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ class PlannerSuite extends FunSuite {
}

test("count is partially aggregated") {
val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan
val query = testData.groupBy('value)(Count('key)).queryExecution.analyzed
val planned = PartialAggregation(query).head
val aggregations = planned.collect { case a: Aggregate => a }

assert(aggregations.size === 2)
}

test("count distinct is not partially aggregated") {
val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan
val planned = PartialAggregation(query.logicalPlan)
val query = testData.groupBy('value)(CountDistinct('key :: Nil)).queryExecution.analyzed
val planned = PartialAggregation(query)
assert(planned.isEmpty)
}

test("mixed aggregates are not partially aggregated") {
val query =
testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan
testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).queryExecution.analyzed
val planned = PartialAggregation(query)
assert(planned.isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
Expand Down Expand Up @@ -238,6 +239,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)

/** Extends QueryExecution with hive specific features. */
@DeveloperApi
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan =
Expand Down

0 comments on commit 882afdf

Please sign in to comment.