diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index dfab2398857e8..2595e1f90c837 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { joinedRelation | relationFactor protected lazy val relationFactor: Parser[LogicalPlan] = - ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ { + ( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ { case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) } | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala index d701559bf2d9b..4d4e4ded99477 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst /** * Identifies a `table` in `database`. If `database` is not defined, the current database is used. */ -private[sql] case class TableIdentifier(table: String, database: Option[String] = None) { - def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database)) - - def toSeq: Seq[String] = database.toSeq :+ table +private[sql] case class TableIdentifier(table: String, database: Option[String]) { + def this(table: String) = this(table, None) override def toString: String = quotedString - def quotedString: String = toSeq.map("`" + _ + "`").mkString(".") + def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`") + + def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table) +} - def unquotedString: String = toSeq.mkString(".") +private[sql] object TableIdentifier { + def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 041ab22827399..e6046055bf0f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -105,7 +105,7 @@ class Analyzer( // here use the CTE definition first, check table name only and ignore database name // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info case u : UnresolvedRelation => - val substituted = cteRelations.get(u.tableIdentifier.last).map { relation => + val substituted = cteRelations.get(u.tableIdentifier.table).map { relation => val withAlias = u.alias.map(Subquery(_, relation)) withAlias.getOrElse(relation) } @@ -257,7 +257,7 @@ class Analyzer( catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { case _: NoSuchTableException => - u.failAnalysis(s"no such table ${u.tableName}") + u.failAnalysis(s"Table Not Found: ${u.tableName}") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 4cc9a5520a085..8f4ce74a2ea38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -42,11 +42,9 @@ trait Catalog { val conf: CatalystConf - def tableExists(tableIdentifier: Seq[String]): Boolean + def tableExists(tableIdent: TableIdentifier): Boolean - def lookupRelation( - tableIdentifier: Seq[String], - alias: Option[String] = None): LogicalPlan + def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan /** * Returns tuples of (tableName, isTemporary) for all tables in the given database. @@ -56,89 +54,59 @@ trait Catalog { def refreshTable(tableIdent: TableIdentifier): Unit - // TODO: Refactor it in the work of SPARK-10104 - def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit + def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit - // TODO: Refactor it in the work of SPARK-10104 - def unregisterTable(tableIdentifier: Seq[String]): Unit + def unregisterTable(tableIdent: TableIdentifier): Unit def unregisterAllTables(): Unit - // TODO: Refactor it in the work of SPARK-10104 - protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = { - if (conf.caseSensitiveAnalysis) { - tableIdentifier - } else { - tableIdentifier.map(_.toLowerCase) - } - } - - // TODO: Refactor it in the work of SPARK-10104 - protected def getDbTableName(tableIdent: Seq[String]): String = { - val size = tableIdent.size - if (size <= 2) { - tableIdent.mkString(".") - } else { - tableIdent.slice(size - 2, size).mkString(".") - } - } - - // TODO: Refactor it in the work of SPARK-10104 - protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = { - (tableIdent.lift(tableIdent.size - 2), tableIdent.last) - } - /** - * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]]. - * We use this method to check it. + * Get the table name of TableIdentifier for temporary tables. */ - protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = { - if (tableIdentifier.length > 1) { + protected def getTableName(tableIdent: TableIdentifier): String = { + // It is not allowed to specify database name for temporary tables. + // We check it here and throw exception if database is defined. + if (tableIdent.database.isDefined) { throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + "for temporary tables. If the table name has dots (.) in it, please quote the " + "table name with backticks (`).") } + if (conf.caseSensitiveAnalysis) { + tableIdent.table + } else { + tableIdent.table.toLowerCase + } } } class SimpleCatalog(val conf: CatalystConf) extends Catalog { - val tables = new ConcurrentHashMap[String, LogicalPlan] - - override def registerTable( - tableIdentifier: Seq[String], - plan: LogicalPlan): Unit = { - checkTableIdentifier(tableIdentifier) - val tableIdent = processTableIdentifier(tableIdentifier) - tables.put(getDbTableName(tableIdent), plan) + private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] + + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + tables.put(getTableName(tableIdent), plan) } - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { - checkTableIdentifier(tableIdentifier) - val tableIdent = processTableIdentifier(tableIdentifier) - tables.remove(getDbTableName(tableIdent)) + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + tables.remove(getTableName(tableIdent)) } override def unregisterAllTables(): Unit = { tables.clear() } - override def tableExists(tableIdentifier: Seq[String]): Boolean = { - checkTableIdentifier(tableIdentifier) - val tableIdent = processTableIdentifier(tableIdentifier) - tables.containsKey(getDbTableName(tableIdent)) + override def tableExists(tableIdent: TableIdentifier): Boolean = { + tables.containsKey(getTableName(tableIdent)) } override def lookupRelation( - tableIdentifier: Seq[String], + tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan = { - checkTableIdentifier(tableIdentifier) - val tableIdent = processTableIdentifier(tableIdentifier) - val tableFullName = getDbTableName(tableIdent) - val table = tables.get(tableFullName) + val tableName = getTableName(tableIdent) + val table = tables.get(tableName) if (table == null) { - sys.error(s"Table Not Found: $tableFullName") + throw new NoSuchTableException } - val tableWithQualifiers = Subquery(tableIdent.last, table) + val tableWithQualifiers = Subquery(tableName, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. @@ -146,11 +114,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { } override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - val result = ArrayBuffer.empty[(String, Boolean)] - for (name <- tables.keySet().asScala) { - result += ((name, true)) - } - result + tables.keySet().asScala.map(_ -> true).toSeq } override def refreshTable(tableIdent: TableIdentifier): Unit = { @@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { * lost when the JVM exits. */ trait OverrideCatalog extends Catalog { + private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] - // TODO: This doesn't work when the database changes... - val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]() - - abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = { - val tableIdent = processTableIdentifier(tableIdentifier) - // A temporary tables only has a single part in the tableIdentifier. - val overriddenTable = if (tableIdentifier.length > 1) { - None: Option[LogicalPlan] + private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { + if (tableIdent.database.isDefined) { + None } else { - overrides.get(getDBTable(tableIdent)) + Option(overrides.get(getTableName(tableIdent))) } - overriddenTable match { + } + + abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { + getOverriddenTable(tableIdent) match { case Some(_) => true - case None => super.tableExists(tableIdentifier) + case None => super.tableExists(tableIdent) } } abstract override def lookupRelation( - tableIdentifier: Seq[String], + tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan = { - val tableIdent = processTableIdentifier(tableIdentifier) - // A temporary tables only has a single part in the tableIdentifier. - val overriddenTable = if (tableIdentifier.length > 1) { - None: Option[LogicalPlan] - } else { - overrides.get(getDBTable(tableIdent)) - } - val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r)) + getOverriddenTable(tableIdent) match { + case Some(table) => + val tableName = getTableName(tableIdent) + val tableWithQualifiers = Subquery(tableName, table) - // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are - // properly qualified with this alias. - val withAlias = - tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r)) + // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes + // are properly qualified with this alias. + alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) - withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias)) + case None => super.lookupRelation(tableIdent, alias) + } } abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - // We always return all temporary tables. - val temporaryTables = overrides.map { - case ((_, tableName), _) => (tableName, true) - }.toSeq - - temporaryTables ++ super.getTables(databaseName) + overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) } - override def registerTable( - tableIdentifier: Seq[String], - plan: LogicalPlan): Unit = { - checkTableIdentifier(tableIdentifier) - val tableIdent = processTableIdentifier(tableIdentifier) - overrides.put(getDBTable(tableIdent), plan) + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { + overrides.put(getTableName(tableIdent), plan) } - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { - // A temporary tables only has a single part in the tableIdentifier. - // If tableIdentifier has more than one parts, it is not a temporary table - // and we do not need to do anything at here. - if (tableIdentifier.length == 1) { - val tableIdent = processTableIdentifier(tableIdentifier) - overrides.remove(getDBTable(tableIdent)) + override def unregisterTable(tableIdent: TableIdentifier): Unit = { + if (tableIdent.database.isEmpty) { + overrides.remove(getTableName(tableIdent)) } } @@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog { override val conf: CatalystConf = EmptyConf - override def tableExists(tableIdentifier: Seq[String]): Boolean = { + override def tableExists(tableIdent: TableIdentifier): Boolean = { throw new UnsupportedOperationException } override def lookupRelation( - tableIdentifier: Seq[String], + tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan = { throw new UnsupportedOperationException } @@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog { throw new UnsupportedOperationException } - override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + override def unregisterTable(tableIdent: TableIdentifier): Unit = { throw new UnsupportedOperationException } - override def unregisterAllTables(): Unit = {} + override def unregisterAllTables(): Unit = { + throw new UnsupportedOperationException + } override def refreshTable(tableIdent: TableIdentifier): Unit = { throw new UnsupportedOperationException diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 43ee3191935eb..c97365003935e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.errors +import org.apache.spark.sql.catalyst.{TableIdentifier, errors} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.catalyst.trees.TreeNode @@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. */ case class UnresolvedRelation( - tableIdentifier: Seq[String], + tableIdentifier: TableIdentifier, alias: Option[String] = None) extends LeafNode { /** Returns a `.` separated name for this relation. */ - def tableName: String = tableIdentifier.mkString(".") + def tableName: String = tableIdentifier.unquotedString override def output: Seq[Attribute] = Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 699c4cc63d09a..27b3cd84b3846 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -286,7 +286,8 @@ package object dsl { def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan = InsertIntoTable( - analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false) + analysis.UnresolvedRelation(TableIdentifier(tableName)), + Map.empty, logicalPlan, overwrite, false) def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 820b336aac759..ec05cfa63c5bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -53,32 +54,39 @@ class AnalysisSuite extends AnalysisTest { Project(testRelation.output, testRelation)) checkAnalysis( - Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))), + Project(Seq(UnresolvedAttribute("TbL.a")), + UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))), Project(testRelation.output, testRelation)) assertAnalysisError( - Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))), + Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation( + TableIdentifier("TaBlE"), Some("TbL"))), Seq("cannot resolve")) checkAnalysis( - Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))), + Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation( + TableIdentifier("TaBlE"), Some("TbL"))), Project(testRelation.output, testRelation), caseSensitive = false) checkAnalysis( - Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))), + Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation( + TableIdentifier("TaBlE"), Some("TbL"))), Project(testRelation.output, testRelation), caseSensitive = false) } test("resolve relations") { - assertAnalysisError(UnresolvedRelation(Seq("tAbLe"), None), Seq("Table Not Found: tAbLe")) + assertAnalysisError( + UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe")) - checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation) + checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) - checkAnalysis(UnresolvedRelation(Seq("tAbLe"), None), testRelation, caseSensitive = false) + checkAnalysis( + UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) - checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation, caseSensitive = false) + checkAnalysis( + UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } test("divide should be casted into fractional types") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 53b3695a86be5..23861ed15da61 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf} trait AnalysisTest extends PlanTest { @@ -30,8 +31,8 @@ trait AnalysisTest extends PlanTest { val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) - caseSensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation) - caseInsensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation) + caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) + caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { override val extendedResolutionRules = EliminateSubQueries :: Nil @@ -67,8 +68,7 @@ trait AnalysisTest extends PlanTest { expectedErrors: Seq[String], caseSensitive: Boolean = true): Unit = { val analyzer = getAnalyzer(caseSensitive) - // todo: make sure we throw AnalysisException during analysis - val e = intercept[Exception] { + val e = intercept[AnalysisException] { analyzer.checkAnalysis(analyzer.execute(inputPlan)) } assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b4ad618c23e39..40c4ae7920918 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation} import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf} class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter { val conf = new SimpleCatalystConf(true) @@ -47,7 +47,7 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter { val b: Expression = UnresolvedAttribute("b") before { - catalog.registerTable(Seq("table"), relation) + catalog.registerTable(TableIdentifier("table"), relation) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 97a8b6518a832..eacdea2c1e5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType import org.apache.spark.{Logging, Partition} +import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} /** * :: Experimental :: @@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName))) + DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName))) } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 03e973666e888..764510ab4b4bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { val overwrite = mode == SaveMode.Overwrite df.sqlContext.executePlan( InsertIntoTable( - UnresolvedRelation(tableIdent.toSeq), + UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), df.logicalPlan, overwrite, @@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq) + val tableExists = df.sqlContext.catalog.tableExists(tableIdent) (tableExists, mode) match { case (true, SaveMode.Ignore) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 3d5e35ab315eb..361eb576c567a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -714,7 +714,7 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - catalog.registerTable(Seq(tableName), df.logicalPlan) + catalog.registerTable(TableIdentifier(tableName), df.logicalPlan) } /** @@ -728,7 +728,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - catalog.unregisterTable(Seq(tableName)) + catalog.unregisterTable(TableIdentifier(tableName)) } /** @@ -795,7 +795,7 @@ class SQLContext private[sql]( } private def table(tableIdent: TableIdentifier): DataFrame = { - DataFrame(this, catalog.lookupRelation(tableIdent.toSeq)) + DataFrame(this, catalog.lookupRelation(tableIdent)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala index f7a88b98c0b48..446739d5b8a2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala @@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan) protected lazy val describeTable: Parser[LogicalPlan] = (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ { case e ~ tableIdent => - DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined) + DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined) } protected lazy val refreshTable: Parser[LogicalPlan] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 31d6b75e13477..e7deeff13dc4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -71,7 +71,6 @@ case class CreateTableUsing( * can analyze the logical plan that will be used to populate the table. * So, [[PreWriteCheck]] can detect cases that are not allowed. */ -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). case class CreateTableUsingAsSelect( tableIdent: TableIdentifier, provider: String, @@ -93,7 +92,7 @@ case class CreateTempTableUsing( val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema, Array.empty[String], provider, options) sqlContext.catalog.registerTable( - tableIdent.toSeq, + tableIdent, DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) Seq.empty[Row] @@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect( val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df) sqlContext.catalog.registerTable( - tableIdent.toSeq, + tableIdent, DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) Seq.empty[Row] @@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier) // If this table is cached as a InMemoryColumnarRelation, drop the original // cached version and make the new version cached lazily. - val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq) + val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent) // Use lookupCachedData directly since RefreshTable also takes databaseName. val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty if (isCached) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 8efc8016f94dd..b00e5680fef9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) { + if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) { // Need to remove SubQuery operator. - EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match { + EliminateSubQueries(catalog.lookupRelation(tableIdent)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). case l @ LogicalRelation(dest: BaseRelation, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 356d4ff3fa837..fd566c8276bc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.PhysicalRDD import scala.concurrent.duration._ @@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { testData.select('key).registerTempTable("t1") sqlContext.table("t1") sqlContext.dropTempTable("t1") - assert( - intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found")) + intercept[NoSuchTableException](sqlContext.table("t1")) } test("Drops cached temporary table") { @@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { assert(sqlContext.isCached("t2")) sqlContext.dropTempTable("t1") - assert( - intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found")) + intercept[NoSuchTableException](sqlContext.table("t1")) assert(!sqlContext.isCached("t2")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7a027e13089e3..b1fb06815868c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.test.SharedSQLContext @@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext { upperCaseData.where('N <= 4).registerTempTable("left") upperCaseData.where('N >= 3).registerTempTable("right") - val left = UnresolvedRelation(Seq("left"), None) - val right = UnresolvedRelation(Seq("right"), None) + val left = UnresolvedRelation(TableIdentifier("left"), None) + val right = UnresolvedRelation(TableIdentifier("right"), None) checkAnswer( left.join(right, $"left.N" === $"right.N", "full"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index eab0fbb196eb6..5688f46e5e3d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.TableIdentifier class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { import testImplicits._ @@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) } test("get all tables") { @@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } @@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index cc02ef81c9f8b..baff7f5752a75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} import org.apache.spark.sql.test.SharedSQLContext @@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(Seq("tmp")) + sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(Seq("tmp")) + sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e620d7fb82af9..4d8a3f728e6b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -358,7 +358,7 @@ class HiveContext private[hive]( @Experimental def analyze(tableName: String) { val tableIdent = SqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) + val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent)) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1f8223e1ff507..5819cb9d08778 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.execution.{FileRelation, datasources} @@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) - // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) { - def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase) + case class QualifiedTableName(database: String, name: String) + + private def getQualifiedTableName(tableIdent: TableIdentifier) = { + QualifiedTableName( + tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, + tableIdent.table.toLowerCase) + } + + private def getQualifiedTableName(hiveTable: HiveTable) = { + QualifiedTableName( + hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, + hiveTable.name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } def invalidateTable(tableIdent: TableIdentifier): Unit = { - val databaseName = tableIdent.database.getOrElse(client.currentDatabase) - val tableName = tableIdent.table - - cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) - } - - val caseSensitive: Boolean = false - - /** - * Creates a data source table (a table created with USING clause) in Hive's metastore. - * Returns true when the table has been created. Otherwise, false. - */ - // TODO: Remove this in SPARK-10104. - def createDataSourceTable( - tableName: String, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - createDataSourceTable( - SqlParser.parseTableIdentifier(tableName), - userSpecifiedSchema, - partitionColumns, - provider, - options, - isExternal) + cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } def createDataSourceTable( @@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive provider: String, options: Map[String, String], isExternal: Boolean): Unit = { - val (dbName, tblName) = { - val database = tableIdent.database.getOrElse(client.currentDatabase) - processDatabaseAndTableName(database, tableIdent.table) - } + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) val tableProperties = new mutable.HashMap[String, String] tableProperties.put("spark.sql.sources.provider", provider) @@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // TODO: Support persisting partitioned data source relations in Hive compatible format val qualifiedTableName = tableIdent.quotedString - val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match { + val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match { case (Some(serde), relation: HadoopFsRelation) if relation.paths.length == 1 && relation.partitionColumns.isEmpty => val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) @@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive (None, message) } - (hiveCompitiableTable, logMessage) match { + (hiveCompatibleTable, logMessage) match { case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatiable way. + // We first try to save the metadata of the table in a Hive compatible way. // If Hive throws an error, we fall back to save its metadata in the Spark SQL // specific way. try { @@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } - def hiveDefaultTableFilePath(tableName: String): String = { - hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName)) - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val database = tableIdent.database.getOrElse(client.currentDatabase) - - new Path( - new Path(client.getDatabase(database).location), - tableIdent.table.toLowerCase).toString + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + new Path(new Path(client.getDatabase(dbName).location), tblName).toString } - def tableExists(tableIdentifier: Seq[String]): Boolean = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = - tableIdent - .lift(tableIdent.size - 2) - .getOrElse(client.currentDatabase) - val tblName = tableIdent.last - client.getTableOption(databaseName, tblName).isDefined + override def tableExists(tableIdent: TableIdentifier): Boolean = { + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) + client.getTableOption(dbName, tblName).isDefined } - def lookupRelation( - tableIdentifier: Seq[String], + override def lookupRelation( + tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { - val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - client.currentDatabase) - val tblName = tableIdent.last - val table = client.getTable(databaseName, tblName) + val qualifiedTableName = getQualifiedTableName(tableIdent) + val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) if (table.properties.get("spark.sql.sources.provider").isDefined) { - val dataSourceTable = - cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) + val dataSourceTable = cachedDataSourceTables(qualifiedTableName) + val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - val withAlias = - alias.map(a => Subquery(a, dataSourceTable)).getOrElse( - Subquery(tableIdent.last, dataSourceTable)) - - withAlias + alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) } else if (table.tableType == VirtualView) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) alias match { @@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText)) } } else { - MetastoreRelation(databaseName, tblName, alias)(table)(hive) + MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive) } } @@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive client.listTables(db).map(tableName => (tableName, false)) } - protected def processDatabaseAndTableName( - databaseName: Option[String], - tableName: String): (Option[String], String) = { - if (!caseSensitive) { - (databaseName.map(_.toLowerCase), tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - - protected def processDatabaseAndTableName( - databaseName: String, - tableName: String): (String, String) = { - if (!caseSensitive) { - (databaseName.toLowerCase, tableName.toLowerCase) - } else { - (databaseName, tableName) - } - } - /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") } - val (dbName, tblName) = processDatabaseAndTableName( - table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( table.copy( @@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( TableIdentifier(desc.name), - hive.conf.defaultDataSourceName, + conf.defaultDataSourceName, temporary = false, Array.empty[String], mode, @@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive table } - val (dbName, tblName) = - processDatabaseAndTableName( - desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( desc.copy( @@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = { + override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } @@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. */ - override def unregisterTable(tableIdentifier: Seq[String]): Unit = { + override def unregisterTable(tableIdent: TableIdentifier): Unit = { throw new UnsupportedOperationException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 1d505019400bc..d4ff5cc0f12a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{logical, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.HiveShim._ @@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging { throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ") } - protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = { - val (db, tableName) = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) - } - - (db, tableName) - } - - protected def extractTableIdent(tableNameParts: Node): Seq[String] = { + protected def extractTableIdent(tableNameParts: Node): TableIdentifier = { tableNameParts.getChildren.asScala.map { case Token(part, Nil) => cleanIdentifier(part) } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) + case Seq(tableOnly) => TableIdentifier(tableOnly) + case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName)) case other => sys.error("Hive only supports tables names like 'tableName' " + s"or 'databaseName.tableName', found '$other'") } @@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { - val (db, viewName) = extractDbNameTableName(viewNameParts) + val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts) val originalText = context.getTokenRewriteStream .toString(query.getTokenStartIndex, query.getTokenStopIndex) val tableDesc = HiveTable( - specifiedDatabase = db, + specifiedDatabase = dbName, name = viewName, schema = schema, partitionColumns = Seq.empty[HiveColumn], @@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case tableName => // It is describing a table with the format like "describe table". DescribeCommand( - UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined) + UnresolvedRelation(TableIdentifier(tableName.getText), None), + isExtended = extended.isDefined) } } // All other cases. @@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C "TOK_TABLELOCATION", "TOK_TABLEPROPERTIES"), children) - val (db, tableName) = extractDbNameTableName(tableNameParts) + val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) // TODO add bucket support var tableDesc: HiveTable = HiveTable( - specifiedDatabase = db, - name = tableName, + specifiedDatabase = dbName, + name = tblName, schema = Seq.empty[HiveColumn], partitionColumns = Seq.empty[HiveColumn], properties = Map[String, String](), @@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C nonAliasClauses) } - val tableIdent = - tableNameParts.getChildren.asScala.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => Seq(tableOnly) - case Seq(databaseName, table) => Seq(databaseName, table) - case other => sys.error("Hive only supports tables names like 'tableName' " + - s"or 'databaseName.tableName', found '$other'") - } + val tableIdent = extractTableIdent(tableNameParts) val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) } val relation = UnresolvedRelation(tableIdent, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 8422287e177e5..e72a60b42e653 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable} @@ -37,8 +38,7 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends RunnableCommand { - def database: String = tableDesc.database - def tableName: String = tableDesc.name + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) override def children: Seq[LogicalPlan] = Seq(query) @@ -72,18 +72,18 @@ case class CreateTableAsSelect( hiveContext.catalog.client.createTable(withSchema) // Get the Metastore Relation - hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match { + hiveContext.catalog.lookupRelation(tableIdentifier, None) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (hiveContext.catalog.tableExists(Seq(database, tableName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { - throw new AnalysisException(s"$database.$tableName already exists.") + throw new AnalysisException(s"$tableIdentifier already exists.") } } else { hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd @@ -93,6 +93,6 @@ case class CreateTableAsSelect( } override def argString: String = { - s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]" + s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 2b504ac974f07..2c81115ee4fed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} import org.apache.spark.sql.{AnalysisException, Row, SQLContext} @@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect( assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) assert(tableDesc.viewText.isDefined) + val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) + override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - val database = tableDesc.database - val viewName = tableDesc.name - if (hiveContext.catalog.tableExists(Seq(database, viewName))) { + if (hiveContext.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // view already exists, will do nothing, to keep consistent with Hive } else if (orReplace) { hiveContext.catalog.client.alertView(prepareTable()) } else { - throw new AnalysisException(s"View $database.$viewName already exists. " + + throw new AnalysisException(s"View $tableIdentifier already exists. " + "If you want to update the view definition, please use ALTER VIEW AS or " + "CREATE OR REPLACE VIEW AS") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 51ec92afd06ed..94210a5394f9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,7 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.catalog.unregisterTable(Seq(tableName)) + hiveContext.catalog.unregisterTable(TableIdentifier(tableName)) Seq.empty[Row] } } @@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand { } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, @@ -131,7 +130,7 @@ case class CreateMetastoreDataSource( val tableName = tableIdent.unquotedString val hiveContext = sqlContext.asInstanceOf[HiveContext] - if (hiveContext.catalog.tableExists(tableIdent.toSeq)) { + if (hiveContext.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -160,7 +159,6 @@ case class CreateMetastoreDataSource( } } -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). private[hive] case class CreateMetastoreDataSourceAsSelect( tableIdent: TableIdentifier, @@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect( } var existingSchema = None: Option[StructType] - if (sqlContext.catalog.tableExists(tableIdent.toSeq)) { + if (sqlContext.catalog.tableExists(tableIdent)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect( val resolved = ResolvedDataSource( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) - EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match { + EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) => if (l.relation != createdRelation.relation) { val errorDescription = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ff39ccb7c1ea5..6883d305cbead 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Make sure any test tables referenced are loaded. val referencedTables = describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last } + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } val referencedTestTables = referencedTables.filter(testTables.contains) logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") referencedTestTables.foreach(loadTestTable) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index c8d272794d10b..8c4af1b8eaf44 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.SaveMode; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +40,8 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.util.Utils; public class JavaMetastoreDataSourcesSuite { @@ -71,7 +72,8 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable")); + hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 579631df772b5..183aca29cf98d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row @@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. - catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan) + catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - catalog.unregisterTable(Seq("ListTablesSuiteTable")) + catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d3565380005a0..d2928876887bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.util.Utils /** @@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv |) """.stripMargin) - val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable") + val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) @@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Drop table will also delete the data. sql("DROP TABLE savedJsonTable") intercept[IOException] { - read.json(catalog.hiveDefaultTableFilePath("savedJsonTable")) + read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable"))) } } @@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Manually create a metastore data source table. catalog.createDataSourceTable( - tableName = "wide_schema", + tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], provider = "json", @@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "EXTERNAL" -> "FALSE"), tableType = ManagedTable, serdeProperties = Map( - "path" -> catalog.hiveDefaultTableFilePath(tableName))) + "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))) catalog.client.createTable(hiveTable) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 6a692d6fce562..9bb32f11b76bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag import org.apache.spark.sql.{Row, SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes + hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.catalog.unregisterTable(Seq("tempTable")) + hiveContext.catalog.unregisterTable(TableIdentifier("tempTable")) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6aa34605b05a8..c929ba50680bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.DefaultParserDialect +import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries} import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("CTAS without serde") { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { - val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) + val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName))) relation match { case LogicalRelation(r: ParquetRelation, _) => if (!isDataSourceParquet) { @@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { (1 to 100).par.map { i => val tableName = s"SPARK_6618_table_$i" sql(s"CREATE TABLE $tableName (col1 string)") - catalog.lookupRelation(Seq(tableName)) + catalog.lookupRelation(TableIdentifier(tableName)) table(tableName) tables() sql(s"DROP TABLE $tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 5eb39b1129701..7efeab528c1dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - catalog.unregisterTable(Seq("tmp")) + catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") {