Skip to content

Commit

Permalink
[SPARK-4186] add binaryFiles and binaryRecords in Python
Browse files Browse the repository at this point in the history
add binaryFiles() and binaryRecords() in Python
```
binaryFiles(self, path, minPartitions=None):
    :: Developer API ::

    Read a directory of binary files from HDFS, a local file system
    (available on all nodes), or any Hadoop-supported file system URI
    as a byte array. Each file is read as a single record and returned
    in a key-value pair, where the key is the path of each file, the
    value is the content of each file.

    Note: Small files are preferred, large file is also allowable, but
    may cause bad performance.

binaryRecords(self, path, recordLength):
    Load data from a flat binary file, assuming each record is a set of numbers
    with the specified numerical format (see ByteBuffer), and the number of
    bytes per record is constant.

    :param path: Directory to the input data files
    :param recordLength: The length at which to split the records
```

Author: Davies Liu <[email protected]>

Closes apache#3078 from davies/binary and squashes the following commits:

cd0bdbd [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
3aa349b [Davies Liu] add experimental notes
24e84b6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
5ceaa8a [Davies Liu] Merge branch 'master' of github.com:apache/spark into binary
1900085 [Davies Liu] bugfix
bb22442 [Davies Liu] add binaryFiles and binaryRecords in Python
  • Loading branch information
Davies Liu authored and mateiz committed Nov 6, 2014
1 parent 5f27ae1 commit b41a39e
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 22 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {


/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

Expand Down Expand Up @@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* :: Experimental ::
*
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
Expand All @@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
@Experimental
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}
Expand Down
45 changes: 29 additions & 16 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import org.apache.spark.input.PortableDataStream

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
Expand Down Expand Up @@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
case pair: Tuple2[_, _] =>
pair._1 match {
case bytePair: Array[Byte] =>
newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
dataOut.writeInt(pair._1.length)
dataOut.write(pair._1)
dataOut.writeInt(pair._2.length)
dataOut.write(pair._2)
}
case stringPair: String =>
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
writeUTF(pair._1, dataOut)
writeUTF(pair._2, dataOut)
}
case other =>
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
}
case other =>
throw new SparkException("Unexpected element type " + first.getClass)
Expand Down
32 changes: 31 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
Expand Down Expand Up @@ -388,6 +388,36 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))

def binaryFiles(self, path, minPartitions=None):
"""
:: Experimental ::
Read a directory of binary files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system URI
as a byte array. Each file is read as a single record and returned
in a key-value pair, where the key is the path of each file, the
value is the content of each file.
Note: Small files are preferred, large file is also allowable, but
may cause bad performance.
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.binaryFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), NoOpSerializer()))

def binaryRecords(self, path, recordLength):
"""
:: Experimental ::
Load data from a flat binary file, assuming each record is a set of numbers
with the specified numerical format (see ByteBuffer), and the number of
bytes per record is constant.
:param path: Directory to the input data files
:param recordLength: The length at which to split the records
"""
return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())

def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
if not d:
Expand Down
19 changes: 19 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,25 @@ def test_converters(self):
(u'\x03', [2.0])]
self.assertEqual(maps, em)

def test_binary_files(self):
path = os.path.join(self.tempdir.name, "binaryfiles")
os.mkdir(path)
data = "short binary data"
with open(os.path.join(path, "part-0000"), 'w') as f:
f.write(data)
[(p, d)] = self.sc.binaryFiles(path).collect()
self.assertTrue(p.endswith("part-0000"))
self.assertEqual(d, data)

def test_binary_records(self):
path = os.path.join(self.tempdir.name, "binaryrecords")
os.mkdir(path)
with open(os.path.join(path, "part-0000"), 'w') as f:
for i in range(100):
f.write('%04d' % i)
result = self.sc.binaryRecords(path, 4).map(int).collect()
self.assertEqual(range(100), result)


class OutputFormatTests(ReusedPySparkTestCase):

Expand Down

0 comments on commit b41a39e

Please sign in to comment.