Skip to content

Commit

Permalink
SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
Browse files Browse the repository at this point in the history
So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it.

This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark.

# Overview
The basics are as follows:
1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark
2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives)
3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString```
4. ```PickleSerializer``` on the Python side deserializes.

This works "out the box" for simple ```Writable```s:
* ```Text```
* ```IntWritable```, ```DoubleWritable```, ```FloatWritable```
* ```NullWritable```
* ```BooleanWritable```
* ```BytesWritable```
* ```MapWritable```

It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added).

I've tested it out with ```ESInputFormat```  as an example and it works very nicely:
```python
conf = {"es.resource" : "index/type" }
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first()
```

I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box.

# Some things still outstanding:
1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~
2. ~~I see from apache#363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~
3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~
4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR

Author: Nick Pentreath <[email protected]>

Closes apache#455 from MLnick/pyspark-inputformats and squashes the following commits:

268df7e [Nick Pentreath] Documentation changes mer @pwendell comments
761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry.
4c972d8 [Nick Pentreath] Add license headers
d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
cde6af9 [Nick Pentreath] Parameterize converter trait
5ebacfa [Nick Pentreath] Update docs for PySpark input formats
a985492 [Nick Pentreath] Move Converter examples to own package
365d0be [Nick Pentreath] Make classes private[python]. Add docs and @experimental annotation to Converter interface.
eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests
1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight
3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python
b65606f [Nick Pentreath] Add converter interface
5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None
085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs
43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide
94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods
1a4a1d6 [Nick Pentreath] Address @mateiz style comments
01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase
9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
84fe8e3 [Nick Pentreath] Python programming guide space formatting
d0f52b6 [Nick Pentreath] Python programming guide
7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
93ef995 [Nick Pentreath] Add back context.py changes
9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py
077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py
5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
35b8e3a [Nick Pentreath] Another fix for test ordering
bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e001b94 [Nick Pentreath] Fix test failures due to ordering
78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide
64eb051 [Nick Pentreath] Scalastyle fix
e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests
1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir
17a656b [Nick Pentreath] remove binary sequencefile for tests
f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark
450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
31a2fff [Nick Pentreath] Scalastyle fixes
fc5099e [Nick Pentreath] Add Apache license headers
4e08983 [Nick Pentreath] Clean up docs for PySpark context methods
b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies
951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
f6aac55 [Nick Pentreath] Bring back msgpack
9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge
a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering
7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging
25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps
65360d5 [Nick Pentreath] Adding test SequenceFiles
0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
d72bf18 [Nick Pentreath] msgpack
dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
e67212a [Nick Pentreath] Add back msgpack dependency
f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats
97ef708 [Nick Pentreath] Remove old writeToStream
2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data.
174f520 [Nick Pentreath] Add back graphx settings
703ee65 [Nick Pentreath] Add back msgpack
619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
eb40036 [Nick Pentreath] Remove unused comment lines
4d7ef2e [Nick Pentreath] Fix indentation
f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments
0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer
4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names
818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD
4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up
4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code
d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat
  • Loading branch information
MLnick authored and pdeyhim committed Jun 25, 2014
1 parent 5f9b236 commit a034761
Show file tree
Hide file tree
Showing 13 changed files with 1,140 additions and 6 deletions.
129 changes: 129 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
import org.apache.spark.annotation.Experimental


/**
* :: Experimental ::
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, U] extends Serializable {
def convert(obj: T): U
}

private[python] object Converter extends Logging {

def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
case Success(c) => c
case Failure(err) =>
logError(s"Failed to load converter: $cc")
throw err
}
}.getOrElse { new DefaultConverter }
}
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] class DefaultConverter extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
* object representation
*/
private def convertWritable(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => iw.get()
case dw: DoubleWritable => dw.get()
case lw: LongWritable => lw.get()
case fw: FloatWritable => fw.get()
case t: Text => t.toString
case bw: BooleanWritable => bw.get()
case byw: BytesWritable => byw.getBytes
case n: NullWritable => null
case aw: ArrayWritable => aw.get().map(convertWritable(_))
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
(convertWritable(k), convertWritable(v))
}.toMap)
case other => other
}
}

def convert(obj: Any): Any = {
obj match {
case writable: Writable =>
convertWritable(writable)
case _ =>
obj
}
}
}

/** Utilities for working with Python objects <-> Hadoop-related objects */
private[python] object PythonHadoopUtil {

/**
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/**
* Merges two configurations, returns a copy of left with keys from right overwriting
* any matching keys in left
*/
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
import collection.JavaConversions._
val copy = new Configuration(left)
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
copy
}

/**
* Converts an RDD of key-value pairs, where key and/or value could be instances of
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}

}
179 changes: 178 additions & 1 deletion core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import scala.util.Try

import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -266,7 +269,7 @@ private object SpecialLengths {
val TIMING_DATA = -3
}

private[spark] object PythonRDD {
private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")

/**
Expand Down Expand Up @@ -346,6 +349,180 @@ private[spark] object PythonRDD {
}
}

/**
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def sequenceFile[K, V](
sc: JavaSparkContext,
path: String,
keyClassMaybeNull: String,
valueClassMaybeNull: String,
keyConverterClass: String,
valueConverterClass: String,
minSplits: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
rdd
}

/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class.
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
* key and value class
* A key and/or value converter class can optionally be passed in
* (see [[org.apache.spark.api.python.Converter]])
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val keyConverter = Converter.getInstance(Option(keyConverterClass))
val valueConverter = Converter.getInstance(Option(valueConverterClass))
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
rdd
}

def writeUTF(str: String, dataOut: DataOutputStream) {
val bytes = str.getBytes(UTF8)
dataOut.writeInt(bytes.length)
Expand Down
Loading

0 comments on commit a034761

Please sign in to comment.