Skip to content

Commit

Permalink
[SPARK-10104] [SQL] Consolidate different forms of table identifiers
Browse files Browse the repository at this point in the history
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.

Author: Wenchen Fan <[email protected]>
Author: Wenchen Fan <[email protected]>

Closes #8453 from cloud-fan/table-name.
  • Loading branch information
cloud-fan authored and davies committed Oct 14, 2015
1 parent 9a430a0 commit 56d7da1
Show file tree
Hide file tree
Showing 32 changed files with 212 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
joinedRelation | relationFactor

protected lazy val relationFactor: Parser[LogicalPlan] =
( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ {
case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
}
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst
/**
* Identifies a `table` in `database`. If `database` is not defined, the current database is used.
*/
private[sql] case class TableIdentifier(table: String, database: Option[String] = None) {
def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))

def toSeq: Seq[String] = database.toSeq :+ table
private[sql] case class TableIdentifier(table: String, database: Option[String]) {
def this(table: String) = this(table, None)

override def toString: String = quotedString

def quotedString: String = toSeq.map("`" + _ + "`").mkString(".")
def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")

def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
}

def unquotedString: String = toSeq.mkString(".")
private[sql] object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class Analyzer(
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
Expand Down Expand Up @@ -257,7 +257,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
u.failAnalysis(s"Table Not Found: ${u.tableName}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ trait Catalog {

val conf: CatalystConf

def tableExists(tableIdentifier: Seq[String]): Boolean
def tableExists(tableIdent: TableIdentifier): Boolean

def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan
def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan

/**
* Returns tuples of (tableName, isTemporary) for all tables in the given database.
Expand All @@ -56,101 +54,67 @@ trait Catalog {

def refreshTable(tableIdent: TableIdentifier): Unit

// TODO: Refactor it in the work of SPARK-10104
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit

// TODO: Refactor it in the work of SPARK-10104
def unregisterTable(tableIdentifier: Seq[String]): Unit
def unregisterTable(tableIdent: TableIdentifier): Unit

def unregisterAllTables(): Unit

// TODO: Refactor it in the work of SPARK-10104
protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
if (conf.caseSensitiveAnalysis) {
tableIdentifier
} else {
tableIdentifier.map(_.toLowerCase)
}
}

// TODO: Refactor it in the work of SPARK-10104
protected def getDbTableName(tableIdent: Seq[String]): String = {
val size = tableIdent.size
if (size <= 2) {
tableIdent.mkString(".")
} else {
tableIdent.slice(size - 2, size).mkString(".")
}
}

// TODO: Refactor it in the work of SPARK-10104
protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
(tableIdent.lift(tableIdent.size - 2), tableIdent.last)
}

/**
* It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
* We use this method to check it.
* Get the table name of TableIdentifier for temporary tables.
*/
protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
if (tableIdentifier.length > 1) {
protected def getTableName(tableIdent: TableIdentifier): String = {
// It is not allowed to specify database name for temporary tables.
// We check it here and throw exception if database is defined.
if (tableIdent.database.isDefined) {
throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
"for temporary tables. If the table name has dots (.) in it, please quote the " +
"table name with backticks (`).")
}
if (conf.caseSensitiveAnalysis) {
tableIdent.table
} else {
tableIdent.table.toLowerCase
}
}
}

class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new ConcurrentHashMap[String, LogicalPlan]

override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
checkTableIdentifier(tableIdentifier)
val tableIdent = processTableIdentifier(tableIdentifier)
tables.put(getDbTableName(tableIdent), plan)
private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]

override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
tables.put(getTableName(tableIdent), plan)
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
checkTableIdentifier(tableIdentifier)
val tableIdent = processTableIdentifier(tableIdentifier)
tables.remove(getDbTableName(tableIdent))
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
tables.remove(getTableName(tableIdent))
}

override def unregisterAllTables(): Unit = {
tables.clear()
}

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
checkTableIdentifier(tableIdentifier)
val tableIdent = processTableIdentifier(tableIdentifier)
tables.containsKey(getDbTableName(tableIdent))
override def tableExists(tableIdent: TableIdentifier): Boolean = {
tables.containsKey(getTableName(tableIdent))
}

override def lookupRelation(
tableIdentifier: Seq[String],
tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
checkTableIdentifier(tableIdentifier)
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
val table = tables.get(tableFullName)
val tableName = getTableName(tableIdent)
val table = tables.get(tableName)
if (table == null) {
sys.error(s"Table Not Found: $tableFullName")
throw new NoSuchTableException
}
val tableWithQualifiers = Subquery(tableIdent.last, table)
val tableWithQualifiers = Subquery(tableName, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val result = ArrayBuffer.empty[(String, Boolean)]
for (name <- tables.keySet().asScala) {
result += ((name, true))
}
result
tables.keySet().asScala.map(_ -> true).toSeq
}

override def refreshTable(tableIdent: TableIdentifier): Unit = {
Expand All @@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
* lost when the JVM exits.
*/
trait OverrideCatalog extends Catalog {
private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]

// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]()

abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
// A temporary tables only has a single part in the tableIdentifier.
val overriddenTable = if (tableIdentifier.length > 1) {
None: Option[LogicalPlan]
private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
if (tableIdent.database.isDefined) {
None
} else {
overrides.get(getDBTable(tableIdent))
Option(overrides.get(getTableName(tableIdent)))
}
overriddenTable match {
}

abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
getOverriddenTable(tableIdent) match {
case Some(_) => true
case None => super.tableExists(tableIdentifier)
case None => super.tableExists(tableIdent)
}
}

abstract override def lookupRelation(
tableIdentifier: Seq[String],
tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
// A temporary tables only has a single part in the tableIdentifier.
val overriddenTable = if (tableIdentifier.length > 1) {
None: Option[LogicalPlan]
} else {
overrides.get(getDBTable(tableIdent))
}
val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
getOverriddenTable(tableIdent) match {
case Some(table) =>
val tableName = getTableName(tableIdent)
val tableWithQualifiers = Subquery(tableName, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
val withAlias =
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
// If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
// are properly qualified with this alias.
alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)

withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
case None => super.lookupRelation(tableIdent, alias)
}
}

abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
// We always return all temporary tables.
val temporaryTables = overrides.map {
case ((_, tableName), _) => (tableName, true)
}.toSeq

temporaryTables ++ super.getTables(databaseName)
overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
}

override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
checkTableIdentifier(tableIdentifier)
val tableIdent = processTableIdentifier(tableIdentifier)
overrides.put(getDBTable(tableIdent), plan)
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
overrides.put(getTableName(tableIdent), plan)
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
// A temporary tables only has a single part in the tableIdentifier.
// If tableIdentifier has more than one parts, it is not a temporary table
// and we do not need to do anything at here.
if (tableIdentifier.length == 1) {
val tableIdent = processTableIdentifier(tableIdentifier)
overrides.remove(getDBTable(tableIdent))
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
if (tableIdent.database.isEmpty) {
overrides.remove(getTableName(tableIdent))
}
}

Expand All @@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog {

override val conf: CatalystConf = EmptyConf

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
override def tableExists(tableIdent: TableIdentifier): Boolean = {
throw new UnsupportedOperationException
}

override def lookupRelation(
tableIdentifier: Seq[String],
tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
throw new UnsupportedOperationException
}
Expand All @@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog {
throw new UnsupportedOperationException
}

override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}

override def unregisterAllTables(): Unit = {}
override def unregisterAllTables(): Unit = {
throw new UnsupportedOperationException
}

override def refreshTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.errors
import org.apache.spark.sql.catalyst.{TableIdentifier, errors}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand All @@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
tableIdentifier: Seq[String],
tableIdentifier: TableIdentifier,
alias: Option[String] = None) extends LeafNode {

/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.mkString(".")
def tableName: String = tableIdentifier.unquotedString

override def output: Seq[Attribute] = Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ package object dsl {

def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
analysis.UnresolvedRelation(TableIdentifier(tableName)),
Map.empty, logicalPlan, overwrite, false)

def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan))
}
Expand Down
Loading

0 comments on commit 56d7da1

Please sign in to comment.