Skip to content

Commit

Permalink
[SPARK-20420][SQL] Add events to the external catalog
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change.

The following events are fired per object:

- Database
  - CreateDatabasePreEvent: event fired before the database is created.
  - CreateDatabaseEvent: event fired after the database has been created.
  - DropDatabasePreEvent: event fired before the database is dropped.
  - DropDatabaseEvent: event fired after the database has been dropped.
- Table
  - CreateTablePreEvent: event fired before the table is created.
  - CreateTableEvent: event fired after the table has been created.
  - RenameTablePreEvent: event fired before the table is renamed.
  - RenameTableEvent: event fired after the table has been renamed.
  - DropTablePreEvent: event fired before the table is dropped.
  - DropTableEvent: event fired after the table has been dropped.
- Function
  - CreateFunctionPreEvent: event fired before the function is created.
  - CreateFunctionEvent: event fired after the function has been created.
  - RenameFunctionPreEvent: event fired before the function is renamed.
  - RenameFunctionEvent: event fired after the function has been renamed.
  - DropFunctionPreEvent: event fired before the function is dropped.
  - DropFunctionPreEvent: event fired after the function has been dropped.

The current events currently only contain the names of the object modified. We add more events, and more details at a later point.

A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`.

## How was this patch tested?
Added the `ExternalCatalogEventSuite`.

Author: Herman van Hovell <[email protected]>

Closes #17710 from hvanhovell/SPARK-20420.
  • Loading branch information
hvanhovell authored and rxin committed Apr 21, 2017
1 parent 48d760d commit e2b3d23
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ListenerBus

/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
abstract class ExternalCatalog {
abstract class ExternalCatalog
extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec

protected def requireDbExists(db: String): Unit = {
Expand Down Expand Up @@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases
// --------------------------------------------------------------------------

def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val db = dbDefinition.name
postToAll(CreateDatabasePreEvent(db))
doCreateDatabase(dbDefinition, ignoreIfExists)
postToAll(CreateDatabaseEvent(db))
}

protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit

final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
postToAll(DropDatabasePreEvent(db))
doDropDatabase(db, ignoreIfNotExists, cascade)
postToAll(DropDatabaseEvent(db))
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit

/**
* Alter a database whose name matches the one specified in `dbDefinition`,
Expand All @@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------

def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.database
val name = tableDefinition.identifier.table
postToAll(CreateTablePreEvent(db, name))
doCreateTable(tableDefinition, ignoreIfExists)
postToAll(CreateTableEvent(db, name))
}

def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit

def renameTable(db: String, oldName: String, newName: String): Unit
final def dropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
postToAll(DropTablePreEvent(db, table))
doDropTable(db, table, ignoreIfNotExists, purge)
postToAll(DropTableEvent(db, table))
}

protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit

final def renameTable(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameTablePreEvent(db, oldName, newName))
doRenameTable(db, oldName, newName)
postToAll(RenameTableEvent(db, oldName, newName))
}

protected def doRenameTable(db: String, oldName: String, newName: String): Unit

/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
Expand Down Expand Up @@ -269,16 +312,40 @@ abstract class ExternalCatalog {
// Functions
// --------------------------------------------------------------------------

def createFunction(db: String, funcDefinition: CatalogFunction): Unit
final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
val name = funcDefinition.identifier.funcName
postToAll(CreateFunctionPreEvent(db, name))
doCreateFunction(db, funcDefinition)
postToAll(CreateFunctionEvent(db, name))
}

def dropFunction(db: String, funcName: String): Unit
protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit

def renameFunction(db: String, oldName: String, newName: String): Unit
final def dropFunction(db: String, funcName: String): Unit = {
postToAll(DropFunctionPreEvent(db, funcName))
doDropFunction(db, funcName)
postToAll(DropFunctionEvent(db, funcName))
}

protected def doDropFunction(db: String, funcName: String): Unit

final def renameFunction(db: String, oldName: String, newName: String): Unit = {
postToAll(RenameFunctionPreEvent(db, oldName, newName))
doRenameFunction(db, oldName, newName)
postToAll(RenameFunctionEvent(db, oldName, newName))
}

protected def doRenameFunction(db: String, oldName: String, newName: String): Unit

def getFunction(db: String, funcName: String): CatalogFunction

def functionExists(db: String, funcName: String): Boolean

def listFunctions(db: String, pattern: String): Seq[String]

override protected def doPostEvent(
listener: ExternalCatalogEventListener,
event: ExternalCatalogEvent): Unit = {
listener.onEvent(event)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class InMemoryCatalog(
// Databases
// --------------------------------------------------------------------------

override def createDatabase(
override protected def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
Expand All @@ -119,7 +119,7 @@ class InMemoryCatalog(
}
}

override def dropDatabase(
override protected def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
Expand Down Expand Up @@ -180,7 +180,7 @@ class InMemoryCatalog(
// Tables
// --------------------------------------------------------------------------

override def createTable(
override protected def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
Expand Down Expand Up @@ -221,7 +221,7 @@ class InMemoryCatalog(
}
}

override def dropTable(
override protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
Expand Down Expand Up @@ -264,7 +264,10 @@ class InMemoryCatalog(
}
}

override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
override protected def doRenameTable(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
Expand Down Expand Up @@ -565,18 +568,21 @@ class InMemoryCatalog(
// Functions
// --------------------------------------------------------------------------

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}

override def dropFunction(db: String, funcName: String): Unit = synchronized {
override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}

override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
override protected def doRenameFunction(
db: String,
oldName: String,
newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.scheduler.SparkListenerEvent

/**
* Event emitted by the external catalog when it is modified. Events are either fired before or
* after the modification (the event should document this).
*/
trait ExternalCatalogEvent extends SparkListenerEvent

/**
* Listener interface for external catalog modification events.
*/
trait ExternalCatalogEventListener {
def onEvent(event: ExternalCatalogEvent): Unit
}

/**
* Event fired when a database is create or dropped.
*/
trait DatabaseEvent extends ExternalCatalogEvent {
/**
* Database of the object that was touched.
*/
val database: String
}

/**
* Event fired before a database is created.
*/
case class CreateDatabasePreEvent(database: String) extends DatabaseEvent

/**
* Event fired after a database has been created.
*/
case class CreateDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired before a database is dropped.
*/
case class DropDatabasePreEvent(database: String) extends DatabaseEvent

/**
* Event fired after a database has been dropped.
*/
case class DropDatabaseEvent(database: String) extends DatabaseEvent

/**
* Event fired when a table is created, dropped or renamed.
*/
trait TableEvent extends DatabaseEvent {
/**
* Name of the table that was touched.
*/
val name: String
}

/**
* Event fired before a table is created.
*/
case class CreateTablePreEvent(database: String, name: String) extends TableEvent

/**
* Event fired after a table has been created.
*/
case class CreateTableEvent(database: String, name: String) extends TableEvent

/**
* Event fired before a table is dropped.
*/
case class DropTablePreEvent(database: String, name: String) extends TableEvent

/**
* Event fired after a table has been dropped.
*/
case class DropTableEvent(database: String, name: String) extends TableEvent

/**
* Event fired before a table is renamed.
*/
case class RenameTablePreEvent(
database: String,
name: String,
newName: String)
extends TableEvent

/**
* Event fired after a table has been renamed.
*/
case class RenameTableEvent(
database: String,
name: String,
newName: String)
extends TableEvent

/**
* Event fired when a function is created, dropped or renamed.
*/
trait FunctionEvent extends DatabaseEvent {
/**
* Name of the function that was touched.
*/
val name: String
}

/**
* Event fired before a function is created.
*/
case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired after a function has been created.
*/
case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is dropped.
*/
case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired after a function has been dropped.
*/
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent

/**
* Event fired before a function is renamed.
*/
case class RenameFunctionPreEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent

/**
* Event fired after a function has been renamed.
*/
case class RenameFunctionEvent(
database: String,
name: String,
newName: String)
extends FunctionEvent
Loading

0 comments on commit e2b3d23

Please sign in to comment.