Skip to content

Commit

Permalink
First draft of java API.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 1, 2014
1 parent ada310a commit 0b859c8
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.sql;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.JavaRow;

public final class JavaSparkSQL {
public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class));
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

JavaSchemaRDD parquetFile = sqlCtx.parquetFile("pair.parquet");
parquetFile.registerAsTable("parquet");

JavaSchemaRDD queryResult = sqlCtx.sql("SELECT * FROM parquet");
queryResult.foreach(new VoidFunction<JavaRow>() {
@Override
public void call(JavaRow row) throws Exception {
System.out.println(row.get(0) + " " + row.get(1));
}
});
}
}
42 changes: 5 additions & 37 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.spark.sql

import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}

/**
* <span class="badge" style="float: right; background-color: darkblue;">ALPHA COMPONENT</span>
Expand Down Expand Up @@ -92,23 +92,10 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
*/
class SchemaRDD(
@transient val sqlContext: SQLContext,
@transient val logicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) {
@transient protected[spark] val logicalPlan: LogicalPlan)
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {

/**
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
*
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
*/
@transient
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)

override def toString =
s"""${super.toString}
|== Query Plan ==
|${queryExecution.executedPlan}""".stripMargin.trim
def baseSchemaRDD = this

// =========================================================================================
// RDD functions: Copy the interal row representation so we present immutable data to users.
Expand Down Expand Up @@ -312,31 +299,12 @@ class SchemaRDD(
sqlContext,
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))

/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the ``function
*
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}

/**
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
* table is tied to the [[SQLContext]] that was used to create this SchemaRDD.
*
* @group schema
*/
def registerAsTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(this, tableName)
}

/**
* Returns this RDD as a SchemaRDD.
* @group schema
*/
def toSchemaRDD = this

/** FOR INTERNAL USE ONLY */
def analyze = sqlContext.analyzer(logicalPlan)
}
66 changes: 66 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical._

/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient protected[spark] val logicalPlan: LogicalPlan

private[sql] def baseSchemaRDD: SchemaRDD

/**
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
*
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
*/
@transient
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)

override def toString =
s"""${super.toString}
|== Query Plan ==
|${queryExecution.executedPlan}""".stripMargin.trim


/**
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
* are written out using this method can be read back in as a SchemaRDD using the ``function
*
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}

/**
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
* table is tied to the [[SQLContext]] that was used to create this SchemaRDD.
*
* @group schema
*/
def registerAsTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.api.java

import org.apache.spark.api.java.JavaSparkContext

import org.apache.spark.sql._

class JavaSQLContext(sparkContext: JavaSparkContext) {

val sqlContext = new SQLContext(sparkContext)

def sql(sqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}

/**
* Loads a parquet file, returning the result as a [[SchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, parquet.ParquetRelation("ParquetFile", path))


/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
*/
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.api.java

import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.rdd.RDD

class JavaSchemaRDD(
@transient val sqlContext: SQLContext,
@transient protected[spark] val logicalPlan: LogicalPlan)
extends JavaRDDLike[JavaRow, JavaRDD[JavaRow]]
with SchemaRDDLike {

private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan)

override val classTag = scala.reflect.classTag[JavaRow]

override def wrapRDD(rdd: RDD[JavaRow]): JavaRDD[JavaRow] = JavaRDD.fromRDD(rdd)

val rdd = baseSchemaRDD.map(new JavaRow(_))
}
58 changes: 58 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.api.java

import org.apache.spark.sql.catalyst.expressions.Row

/**
* A result row from a SparkSQL query.
*/
class JavaRow(row: Row) {

def length: Int = row.length

def get(i: Int): Any =
row(i)

def isNullAt(i: Int) = get(i) == null

def getInt(i: Int): Int =
row.getInt(i)

def getLong(i: Int): Long =
row.getLong(i)

def getDouble(i: Int): Double =
row.getDouble(i)

def getBoolean(i: Int): Boolean =
row.getBoolean(i)

def getShort(i: Int): Short =
row.getShort(i)

def getByte(i: Int): Byte =
row.getByte(i)

def getFloat(i: Int): Float =
row.getFloat(i)

def getString(i: Int): String =
row.getString(i)
}

0 comments on commit 0b859c8

Please sign in to comment.