From b0c252a89499b5e2368505ce9ed4570cdc2a66dc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Jan 2017 20:29:36 +0800 Subject: [PATCH] do not expose the external table concept in Catalog --- project/MimaExcludes.scala | 5 +- .../codegen/GenerateOrdering.scala | 20 ++- .../apache/spark/sql/catalog/Catalog.scala | 129 +++++++++++++++--- .../command/createDataSourceTables.scala | 9 -- .../spark/sql/internal/CatalogImpl.scala | 78 ++++------- .../spark/sql/internal/CatalogSuite.scala | 66 ++++++--- 6 files changed, 196 insertions(+), 111 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2314d7f45cb21..e0ee00e6826ab 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -43,7 +43,10 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"), // [SPARK-18537] Add a REST api to spark streaming - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"), + + // [SPARK-19148][SQL] do not expose the external table concept in Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable") ) // Exclude rules for 2.1.x diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index b7335f12b64b1..2ffd188fee1c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -131,17 +131,15 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR return 0; """ }, - foldFunctions = { funCalls => - funCalls.zipWithIndex.map { case (funCall, i) => - val comp = ctx.freshName("comp") - s""" - int $comp = $funCall; - if ($comp != 0) { - return $comp; - } - """ - }.mkString - }) + foldFunctions = _.map { funCall => + val comp = ctx.freshName("comp") + s""" + int $comp = $funCall; + if ($comp != 0) { + return $comp; + } + """ + }.mkString) } protected def create(ordering: Seq[SortOrder]): BaseOrdering = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6b061f8ab2740..41e781ed186f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -187,82 +189,169 @@ abstract class Catalog { def functionExists(dbName: String, functionName: String): Boolean /** - * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path) + } + + /** + * :: Experimental :: + * Creates a table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String): DataFrame + def createTable(tableName: String, path: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, path, source) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String, source: String): DataFrame + def createTable(tableName: String, path: String, source: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame /** * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** + * :: Experimental :: + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 73b21533a26c7..6cf36a233ee5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo options = table.storage.properties ++ pathOption, catalogTable = Some(tableWithDefaultOptions)).resolveRelation() - dataSource match { - case fs: HadoopFsRelation => - if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { - throw new AnalysisException( - "Cannot create a file-based external data source table without path") - } - case _ => - } - val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 8244b2152c1c4..9136a83bc2d89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.internal -import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String): DataFrame = { + override def createTable(tableName: String, path: String): DataFrame = { val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName - createExternalTable(tableName, path, dataSourceName) + createTable(tableName, path, dataSourceName) } /** * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String, source: String): DataFrame = { - createExternalTable(tableName, source, Map("path" -> path)) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.asScala.toMap) + override def createTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, source, Map("path" -> path)) } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame = { - createExternalTable(tableName, source, new StructType, options) - } - - /** - * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.asScala.toMap) + createTable(tableName, source, new StructType, options) } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + val storage = DataSource.buildStorageFormatFromOptions(options) + val tableType = if (storage.locationUri.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val tableDesc = CatalogTable( identifier = tableIdent, - tableType = CatalogTableType.EXTERNAL, - storage = DataSource.buildStorageFormatFromOptions(options), + tableType = tableType, + storage = storage, schema = schema, provider = Some(source) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 5dd04543ed144..801912f44174f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.internal +import java.io.File +import java.net.URI + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.StructType /** @@ -37,6 +40,7 @@ class CatalogSuite extends SparkFunSuite with BeforeAndAfterEach with SharedSQLContext { + import testImplicits._ private def sessionCatalog: SessionCatalog = spark.sessionState.catalog @@ -306,22 +310,6 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } - test("createExternalTable should fail if path is not given for file-based data source") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) - } - assert(e.message.contains("Unable to infer schema")) - - val e2 = intercept[AnalysisException] { - spark.catalog.createExternalTable( - "tbl", - "json", - new StructType().add("i", IntegerType), - Map.empty[String, String]) - } - assert(e2.message == "Cannot create a file-based external data source table without path") - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name") @@ -460,6 +448,50 @@ class CatalogSuite } } + test("createTable with 'path' in options") { + withTable("t") { + withTempDir { dir => + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map("path" -> dir.getAbsolutePath)) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.EXTERNAL) + assert(table.storage.locationUri.get == dir.getAbsolutePath) + + Seq((1)).toDF("i").write.insertInto("t") + assert(dir.exists() && dir.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path and data files are still there after DROP TABLE, if custom table path is + // specified. + assert(dir.exists() && dir.listFiles().nonEmpty) + } + } + } + + test("createTable without 'path' in options") { + withTable("t") { + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map.empty[String, String]) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.MANAGED) + val tablePath = new File(new URI(table.storage.locationUri.get)) + assert(tablePath.exists() && tablePath.listFiles().isEmpty) + + Seq((1)).toDF("i").write.insertInto("t") + assert(tablePath.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path is removed after DROP TABLE, if custom table path is not specified. + assert(!tablePath.exists()) + } + } + // TODO: add tests for the rest of them }