Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20420][SQL] Add events to the external catalog #17710

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ListenerBus

/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
abstract class ExternalCatalog {
abstract class ExternalCatalog
extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec

protected def requireDbExists(db: String): Unit = {
Expand Down Expand Up @@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases
// --------------------------------------------------------------------------

def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val db = dbDefinition.name
postToAll(CreateDatabasePreEvent(db))
doCreateDatabase(dbDefinition, ignoreIfExists)
postToAll(CreateDatabaseEvent(db))
}

protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit

final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
postToAll(DropDatabasePreEvent(db))
doDropDatabase(db, ignoreIfNotExists, cascade)
postToAll(DropDatabaseEvent(db))
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

/**
* Alter a database whose name matches the one specified in `dbDefinition`,
Expand All @@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------

def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.database
val name = tableDefinition.identifier.table
postToAll(CreateTablePreEvent(db, name))
doCreateTable(tableDefinition, ignoreIfExists)
postToAll(CreateTableEvent(db, name))
}

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit

def renameTable(db: String, oldName: String, newName: String): Unit
final def dropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
postToAll(DropTablePreEvent(db, table))
doDropTable(db, table, ignoreIfNotExists, purge)
postToAll(DropTableEvent(db, table))
}

protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit

final def renameTable(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameTablePreEvent(db, oldName, newName))
doRenameTable(db, oldName, newName)
postToAll(RenameTableEvent(db, oldName, newName))
}

protected def doRenameTable(db: String, oldName: String, newName: String): Unit

/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
Expand Down Expand Up @@ -269,16 +312,40 @@ abstract class ExternalCatalog {
// Functions
// --------------------------------------------------------------------------

def createFunction(db: String, funcDefinition: CatalogFunction): Unit
final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
val name = funcDefinition.identifier.funcName
postToAll(CreateFunctionPreEvent(db, name))
doCreateFunction(db, funcDefinition)
postToAll(CreateFunctionEvent(db, name))
}

def dropFunction(db: String, funcName: String): Unit
protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit

def renameFunction(db: String, oldName: String, newName: String): Unit
final def dropFunction(db: String, funcName: String): Unit = {
postToAll(DropFunctionPreEvent(db, funcName))
doDropFunction(db, funcName)
postToAll(DropFunctionEvent(db, funcName))
}

protected def doDropFunction(db: String, funcName: String): Unit

final def renameFunction(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameFunctionPreEvent(db, oldName, newName))
doRenameFunction(db, oldName, newName)
postToAll(RenameFunctionEvent(db, oldName, newName))
}

protected def doRenameFunction(db: String, oldName: String, newName: String): Unit

def getFunction(db: String, funcName: String): CatalogFunction

def functionExists(db: String, funcName: String): Boolean

def listFunctions(db: String, pattern: String): Seq[String]

override protected def doPostEvent(
listener: ExternalCatalogEventListener,
event: ExternalCatalogEvent): Unit = {
listener.onEvent(event)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class InMemoryCatalog(
// Databases
// --------------------------------------------------------------------------

override def createDatabase(
override protected def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
Expand All @@ -119,7 +119,7 @@ class InMemoryCatalog(
}
}

override def dropDatabase(
override protected def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
Expand Down Expand Up @@ -180,7 +180,7 @@ class InMemoryCatalog(
// Tables
// --------------------------------------------------------------------------

override def createTable(
override protected def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
Expand Down Expand Up @@ -221,7 +221,7 @@ class InMemoryCatalog(
}
}

override def dropTable(
override protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
Expand Down Expand Up @@ -264,7 +264,10 @@ class InMemoryCatalog(
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
override protected def doRenameTable(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
Expand Down Expand Up @@ -565,18 +568,21 @@ class InMemoryCatalog(
// Functions
// --------------------------------------------------------------------------

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}

override def dropFunction(db: String, funcName: String): Unit = synchronized {
override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}

override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
override protected def doRenameFunction(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.scheduler.SparkListenerEvent

/**
* Event emitted by the external catalog when it is modified. Events are either fired before or
* after the modification (the event should document this).
*/
trait ExternalCatalogEvent extends SparkListenerEvent

/**
* Listener interface for external catalog modification events.
*/
trait ExternalCatalogEventListener {
def onEvent(event: ExternalCatalogEvent): Unit
}

/**
* Event fired when a database is create or dropped.
*/
trait DatabaseEvent extends ExternalCatalogEvent {
/**
* Database of the object that was touched.
*/
val database: String
}

/**
* Event fired before a database is created.
*/
case class CreateDatabasePreEvent(database: String) extends DatabaseEvent

/**
* Event fired after a database has been created.
*/
case class CreateDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired before a database is dropped.
*/
case class DropDatabasePreEvent(database: String) extends DatabaseEvent

/**
* Event fired after a database has been dropped.
*/
case class DropDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired when a table is created, dropped or renamed.
*/
trait TableEvent extends DatabaseEvent {
/**
* Name of the table that was touched.
*/
val name: String
}

/**
* Event fired before a table is created.
*/
case class CreateTablePreEvent(database: String, name: String) extends TableEvent

/**
* Event fired after a table has been created.
*/
case class CreateTableEvent(database: String, name: String) extends TableEvent

/**
* Event fired before a table is dropped.
*/
case class DropTablePreEvent(database: String, name: String) extends TableEvent

/**
* Event fired after a table has been dropped.
*/
case class DropTableEvent(database: String, name: String) extends TableEvent

/**
* Event fired before a table is renamed.
*/
case class RenameTablePreEvent(
database: String,
name: String,
newName: String)
extends TableEvent

/**
* Event fired after a table has been renamed.
*/
case class RenameTableEvent(
database: String,
name: String,
newName: String)
extends TableEvent

/**
* Event fired when a function is created, dropped or renamed.
*/
trait FunctionEvent extends DatabaseEvent {
/**
* Name of the function that was touched.
*/
val name: String
}

/**
* Event fired before a function is created.
*/
case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired after a function has been created.
*/
case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is dropped.
*/
case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired after a function has been dropped.
*/
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is renamed.
*/
case class RenameFunctionPreEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent

/**
* Event fired after a function has been renamed.
*/
case class RenameFunctionEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent
Loading