Skip to content

Commit

Permalink
do not expose the external table concept in Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 10, 2017
1 parent acfc5f3 commit b0c252a
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 111 deletions.
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),

// [SPARK-18537] Add a REST api to spark streaming
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"),

// [SPARK-19148][SQL] do not expose the external table concept in Catalog
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable")
)

// Exclude rules for 2.1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,15 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
return 0;
"""
},
foldFunctions = { funCalls =>
funCalls.zipWithIndex.map { case (funCall, i) =>
val comp = ctx.freshName("comp")
s"""
int $comp = $funCall;
if ($comp != 0) {
return $comp;
}
"""
}.mkString
})
foldFunctions = _.map { funCall =>
val comp = ctx.freshName("comp")
s"""
int $comp = $funCall;
if ($comp != 0) {
return $comp;
}
"""
}.mkString)
}

protected def create(ordering: Seq[SortOrder]): BaseOrdering = {
Expand Down
129 changes: 109 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalog

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -187,82 +189,169 @@ abstract class Catalog {
def functionExists(dbName: String, functionName: String): Boolean

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String): DataFrame = {
createTable(tableName, path)
}

/**
* :: Experimental ::
* Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(tableName: String, path: String): DataFrame
def createTable(tableName: String, path: String): DataFrame

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source
* and returns the corresponding DataFrame.
* Creates a table from the given path based on a data source and returns the corresponding
* DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
createTable(tableName, path, source)
}

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and returns the corresponding
* DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(tableName: String, path: String, source: String): DataFrame
def createTable(tableName: String, path: String, source: String): DataFrame

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source and a set of options.
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, options)
}

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, options.asScala.toMap)
}

/**
* (Scala-specific)
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame
options: Map[String, String]): DataFrame = {
createTable(tableName, source, options)
}

/**
* :: Experimental ::
* (Scala-specific)
* Creates an external table from the given path based on a data source and a set of options.
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(
def createTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame

/**
* :: Experimental ::
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options)
}

/**
* :: Experimental ::
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options.asScala.toMap)
}

/**
* (Scala-specific)
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
@deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame
options: Map[String, String]): DataFrame = {
createTable(tableName, source, schema, options)
}

/**
* :: Experimental ::
* (Scala-specific)
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
def createExternalTable(
def createTable(
tableName: String,
source: String,
schema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()

dataSource match {
case fs: HadoopFsRelation =>
if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
throw new AnalysisException(
"Cannot create a file-based external data source table without path")
}
case _ =>
}

val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.internal

import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.Experimental
Expand Down Expand Up @@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @group ddl_ops
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
override def createExternalTable(tableName: String, path: String): DataFrame = {
override def createTable(tableName: String, path: String): DataFrame = {
val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName
createExternalTable(tableName, path, dataSourceName)
createTable(tableName, path, dataSourceName)
}

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source
* and returns the corresponding DataFrame.
* Creates a table from the given path based on a data source and returns the corresponding
* DataFrame.
*
* @group ddl_ops
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
override def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
createExternalTable(tableName, source, Map("path" -> path))
}

/**
* :: Experimental ::
* Creates an external table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 2.0.0
*/
@Experimental
override def createExternalTable(
tableName: String,
source: String,
options: java.util.Map[String, String]): DataFrame = {
createExternalTable(tableName, source, options.asScala.toMap)
override def createTable(tableName: String, path: String, source: String): DataFrame = {
createTable(tableName, source, Map("path" -> path))
}

/**
* :: Experimental ::
* (Scala-specific)
* Creates an external table from the given path based on a data source and a set of options.
* Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
override def createExternalTable(
override def createTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
createExternalTable(tableName, source, new StructType, options)
}

/**
* :: Experimental ::
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 2.0.0
*/
@Experimental
override def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
createExternalTable(tableName, source, schema, options.asScala.toMap)
createTable(tableName, source, new StructType, options)
}

/**
* :: Experimental ::
* (Scala-specific)
* Create an external table from the given path based on a data source, a schema and
* a set of options. Then, returns the corresponding DataFrame.
* Create a table from the given path based on a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
* @since 2.0.0
* @since 2.2.0
*/
@Experimental
override def createExternalTable(
override def createTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = CatalogTableType.EXTERNAL,
storage = DataSource.buildStorageFormatFromOptions(options),
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(source)
)
Expand Down
Loading

0 comments on commit b0c252a

Please sign in to comment.