diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index e09182dd8d5df..6b58b9322c4bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -31,6 +31,7 @@ trait Catalog { alias: Option[String] = None): LogicalPlan def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit + def unregisterTable(databaseName: Option[String], tableName: String): Unit } class SimpleCatalog extends Catalog { @@ -40,7 +41,7 @@ class SimpleCatalog extends Catalog { tables += ((tableName, plan)) } - def dropTable(tableName: String) = tables -= tableName + def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName } def lookupRelation( databaseName: Option[String], @@ -87,6 +88,10 @@ trait OverrideCatalog extends Catalog { plan: LogicalPlan): Unit = { overrides.put((databaseName, tableName), plan) } + + override def unregisterTable(databaseName: Option[String], tableName: String): Unit = { + overrides.remove((databaseName, tableName)) + } } /** @@ -104,4 +109,8 @@ object EmptyCatalog extends Catalog { def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = { throw new UnsupportedOperationException } + + def unregisterTable(databaseName: Option[String], tableName: String): Unit = { + throw new UnsupportedOperationException + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f950ea08ec57a..69bbbdc8943fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,8 +26,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan import org.apache.spark.sql.execution._ /** @@ -111,6 +112,35 @@ class SQLContext(@transient val sparkContext: SparkContext) result } + /** Returns the specified table as a SchemaRDD */ + def table(tableName: String): SchemaRDD = + new SchemaRDD(this, catalog.lookupRelation(None, tableName)) + + /** Caches the specified table in-memory. */ + def cacheTable(tableName: String): Unit = { + val currentTable = catalog.lookupRelation(None, tableName) + val asInMemoryRelation = + InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan) + + catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation)) + } + + /** Removes the specified table from the in-memory cache. */ + def uncacheTable(tableName: String): Unit = { + EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { + // This is kind of a hack to make sure that if this was just an RDD registered as a table, + // we reregister the RDD as a table. + case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) => + inMem.cachedColumnBuffers.unpersist() + catalog.unregisterTable(None, tableName) + catalog.registerTable(None, tableName, SparkLogicalPlan(e)) + case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) => + inMem.cachedColumnBuffers.unpersist() + catalog.unregisterTable(None, tableName) + case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan") + } + } + protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala new file mode 100644 index 0000000000000..e5902c3cae381 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.FunSuite +import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan + +class CachedTableSuite extends QueryTest { + TestData // Load test tables. + + test("read from cached table and uncache") { + TestSQLContext.cacheTable("testData") + + checkAnswer( + TestSQLContext.table("testData"), + testData.collect().toSeq + ) + + TestSQLContext.table("testData").queryExecution.analyzed match { + case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching + case noCache => fail(s"No cache node found in plan $noCache") + } + + TestSQLContext.uncacheTable("testData") + + checkAnswer( + TestSQLContext.table("testData"), + testData.collect().toSeq + ) + + TestSQLContext.table("testData").queryExecution.analyzed match { + case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) => + fail(s"Table still cached after uncache: $cachePlan") + case noCache => // Table uncached successfully + } + } + + test("correct error on uncache of non-cached table") { + intercept[IllegalArgumentException] { + TestSQLContext.uncacheTable("testData") + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 4f8353666a12b..29834a11f41dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -141,6 +141,13 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { */ override def registerTable( databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ??? + + /** + * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. + * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. + */ + override def unregisterTable( + databaseName: Option[String], tableName: String): Unit = ??? } object HiveMetastoreTypes extends RegexParsers { diff --git a/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b b/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b new file mode 100644 index 0000000000000..60878ffb77064 --- /dev/null +++ b/sql/hive/src/test/resources/golden/read from cached table-0-ce3797dc14a603cba2a5e58c8612de5b @@ -0,0 +1 @@ +238 val_238 diff --git a/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b b/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b new file mode 100644 index 0000000000000..60878ffb77064 --- /dev/null +++ b/sql/hive/src/test/resources/golden/read from uncached table-0-ce3797dc14a603cba2a5e58c8612de5b @@ -0,0 +1 @@ +238 val_238 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala new file mode 100644 index 0000000000000..68d45e53cdf26 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.hive.execution.HiveComparisonTest + +class CachedTableSuite extends HiveComparisonTest { + TestHive.loadTestTable("src") + + test("cache table") { + TestHive.cacheTable("src") + } + + createQueryTest("read from cached table", + "SELECT * FROM src LIMIT 1") + + test("check that table is cached and uncache") { + TestHive.table("src").queryExecution.analyzed match { + case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching + case noCache => fail(s"No cache node found in plan $noCache") + } + TestHive.uncacheTable("src") + } + + createQueryTest("read from uncached table", + "SELECT * FROM src LIMIT 1") + + test("make sure table is uncached") { + TestHive.table("src").queryExecution.analyzed match { + case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) => + fail(s"Table still cached after uncache: $cachePlan") + case noCache => // Table uncached successfully + } + } + + test("correct error on uncache of non-cached table") { + intercept[IllegalArgumentException] { + TestHive.uncacheTable("src") + } + } +}