Skip to content

Commit

Permalink
Added table scan operator for in-memory columnar support.
Browse files Browse the repository at this point in the history
Signed-off-by: Cheng Lian <[email protected]>
  • Loading branch information
liancheng committed Mar 22, 2014
1 parent c701c7a commit 220ee1e
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
package columnar

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.execution.SparkPlan

/**
* A simple class that converts any SharkPlan into an in-memory columnar relation.
*/
case class InMemoryColumnarRelation(tableName: String, child: SparkPlan)
extends BaseRelation {

override def output: Seq[Attribute] = child.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
package columnar

import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.columnar.ColumnType._
import org.apache.spark.sql.execution.LeafNode

case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
relation: InMemoryColumnarRelation)
extends LeafNode {

override def output: Seq[Attribute] = attributes

lazy val cachedColumnBuffers = {
val cached = relation.child.execute().mapPartitions { iterator =>
val columnBuilders = relation.child.output.map { a =>
ColumnBuilder(a.dataType.typeId, 0, a.name)
}.toArray

var row: Row = null
while (iterator.hasNext) {
row = iterator.next()
var i = 0
while (i < row.length) {
columnBuilders(i).appendFrom(row, i)
i += 1
}
}

Iterator.single(columnBuilders.map(_.build()))
}.cache()

cached.setName(relation.child.toString)
// Force the materialization of the cached RDD.
cached.count()
cached
}

override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
val columnBuffers = iterator.next()
assert(!iterator.hasNext)

new Iterator[Row] {
val columnAccessors = columnBuffers.map(buffer => ColumnAccessor(buffer))
val nextRow = new GenericMutableRow(columnAccessors.length)

override def next() = {
var i = 0
while (i < nextRow.length) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
nextRow
}

override def hasNext = columnAccessors.head.hasNext
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.spark.sql.columnar

import org.apache.spark.sql.{TestData, TestSqlContext, DslQueryTest}
import org.apache.spark.sql.execution.SparkLogicalPlan

class ColumnarQuerySuite extends DslQueryTest {
import TestData._

test("simple columnar query") {
val plan = TestSqlContext.executePlan(testData).executedPlan
val attributes = plan.output
val relation = InMemoryColumnarRelation("t1", plan)
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(attributes, relation))

checkAnswer(scan, testData.data)
}
}

0 comments on commit 220ee1e

Please sign in to comment.