Skip to content

Commit

Permalink
[SQL] SPARK-1372 Support for caching and uncaching tables in a SQLCon…
Browse files Browse the repository at this point in the history
…text.

This doesn't yet support different databases in Hive (though you can probably workaround this by calling `USE <dbname>`).  However, given the time constraints for 1.0 I think its probably worth including this now and extending the functionality in the next release.

Author: Michael Armbrust <[email protected]>

Closes apache#282 from marmbrus/cacheTables and squashes the following commits:

83785db [Michael Armbrust] Support for caching and uncaching tables in a SQLContext.
  • Loading branch information
marmbrus authored and rxin committed Apr 1, 2014
1 parent ada310a commit f5c418d
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ trait Catalog {
alias: Option[String] = None): LogicalPlan

def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
def unregisterTable(databaseName: Option[String], tableName: String): Unit
}

class SimpleCatalog extends Catalog {
Expand All @@ -40,7 +41,7 @@ class SimpleCatalog extends Catalog {
tables += ((tableName, plan))
}

def dropTable(tableName: String) = tables -= tableName
def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }

def lookupRelation(
databaseName: Option[String],
Expand Down Expand Up @@ -87,6 +88,10 @@ trait OverrideCatalog extends Catalog {
plan: LogicalPlan): Unit = {
overrides.put((databaseName, tableName), plan)
}

override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
overrides.remove((databaseName, tableName))
}
}

/**
Expand All @@ -104,4 +109,8 @@ object EmptyCatalog extends Catalog {
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}

def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
throw new UnsupportedOperationException
}
}
32 changes: 31 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
import org.apache.spark.sql.execution._

/**
Expand Down Expand Up @@ -111,6 +112,35 @@ class SQLContext(@transient val sparkContext: SparkContext)
result
}

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
new SchemaRDD(this, catalog.lookupRelation(None, tableName))

/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)

catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}

/** Removes the specified table from the in-memory cache. */
def uncacheTable(tableName: String): Unit = {
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
}
}

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

class CachedTableSuite extends QueryTest {
TestData // Load test tables.

test("read from cached table and uncache") {
TestSQLContext.cacheTable("testData")

checkAnswer(
TestSQLContext.table("testData"),
testData.collect().toSeq
)

TestSQLContext.table("testData").queryExecution.analyzed match {
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
case noCache => fail(s"No cache node found in plan $noCache")
}

TestSQLContext.uncacheTable("testData")

checkAnswer(
TestSQLContext.table("testData"),
testData.collect().toSeq
)

TestSQLContext.table("testData").queryExecution.analyzed match {
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
fail(s"Table still cached after uncache: $cachePlan")
case noCache => // Table uncached successfully
}
}

test("correct error on uncache of non-cached table") {
intercept[IllegalArgumentException] {
TestSQLContext.uncacheTable("testData")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
*/
override def registerTable(
databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???

/**
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
override def unregisterTable(
databaseName: Option[String], tableName: String): Unit = ???
}

object HiveMetastoreTypes extends RegexParsers {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
238 val_238
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
238 val_238
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
import org.apache.spark.sql.hive.execution.HiveComparisonTest

class CachedTableSuite extends HiveComparisonTest {
TestHive.loadTestTable("src")

test("cache table") {
TestHive.cacheTable("src")
}

createQueryTest("read from cached table",
"SELECT * FROM src LIMIT 1")

test("check that table is cached and uncache") {
TestHive.table("src").queryExecution.analyzed match {
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
case noCache => fail(s"No cache node found in plan $noCache")
}
TestHive.uncacheTable("src")
}

createQueryTest("read from uncached table",
"SELECT * FROM src LIMIT 1")

test("make sure table is uncached") {
TestHive.table("src").queryExecution.analyzed match {
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
fail(s"Table still cached after uncache: $cachePlan")
case noCache => // Table uncached successfully
}
}

test("correct error on uncache of non-cached table") {
intercept[IllegalArgumentException] {
TestHive.uncacheTable("src")
}
}
}

0 comments on commit f5c418d

Please sign in to comment.