Skip to content

Commit

Permalink
Remove attributes field from InMemoryColumnarTableScan
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Apr 11, 2014
1 parent 32cc9ce commit 882c538
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 17 deletions.
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
InMemoryColumnarTableScan(executePlan(currentTable).executedPlan)

catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
Expand All @@ -131,7 +131,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(e: ExistingRdd)) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@ import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
private[sql] case class InMemoryColumnarTableScan(child: SparkPlan)
extends LeafNode {

override def output: Seq[Attribute] = attributes
override def output: Seq[Attribute] = child.output

lazy val cachedColumnBuffers = {
val ordinals = attributes.map(a => child.output.indexWhere(_.name == a.name))
val output = child.output
val childOutput = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = ordinals.map { i =>
val attribute = output(i)
val columnBuilders = childOutput.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
}.toArray

var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < ordinals.length) {
columnBuilders(i).appendFrom(row, ordinals(i))
while (i < childOutput.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case InMemoryColumnarTableScan(output, child) =>
InMemoryColumnarTableScan(output.map(_.newInstance), child)
case scan @ InMemoryColumnarTableScan(child) => scan
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))

checkAnswer(scan, testData.collect().toSeq)
}

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan))

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), _, child, _) =>
HiveTableScan(_, table, _))), _, child, _) =>
castChildOutput(p, table, child)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), partition, child, overwrite) =>
HiveTableScan(_, table, _))), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
Expand Down

0 comments on commit 882c538

Please sign in to comment.