Skip to content

Commit

Permalink
Change sequencefile test data generator to easily be called from PySp…
Browse files Browse the repository at this point in the history
…ark tests
  • Loading branch information
MLnick committed Apr 21, 2014
1 parent 1d7c17c commit c0ebfb6
Showing 1 changed file with 75 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.io._
import scala.Array
import java.io.{DataOutput, DataInput}
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import org.apache.spark.api.java.JavaSparkContext

/**
* A class to test MsgPack serialization on the Scala side, that will be deserialized
Expand All @@ -33,6 +34,13 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
case class TestWritable(var str: String, var int: Int, var double: Double) extends Writable {
def this() = this("", 0, 0.0)

def getStr = str
def setStr(str: String) { this.str = str }
def getInt = int
def setInt(int: Int) { this.int = int }
def getDouble = double
def setDouble(double: Double) { this.double = double }

def write(out: DataOutput) = {
out.writeUTF(str)
out.writeInt(int)
Expand All @@ -47,81 +55,83 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
}

/**
* Main method to generate SequenceFile test data and write to the python 'test_support' directory.
* Be sure to set the SPARK_HOME environment variable appropriately
* This object contains method to generate SequenceFile test data and write it to a
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator extends App {
import SparkContext._

val sc = new SparkContext("local[2]", "test")
def generateData(path: String, jsc: JavaSparkContext) {
val sc = jsc.sc //new SparkContext("local[2]", "test")

val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val basePath = s"$sparkHome/python/test_support/data/"
val textPath = s"$basePath/sftext/"
val intPath = s"$basePath/sfint/"
val doublePath = s"$basePath/sfdouble/"
val arrPath = s"$basePath/sfarray/"
val mapPath = s"$basePath/sfmap/"
val classPath = s"$basePath/sfclass/"
val bytesPath = s"$basePath/sfbytes/"
val boolPath = s"$basePath/sfbool"
val nullPath = s"$basePath/sfnull"
val basePath = s"$path/sftestdata/"
val textPath = s"$basePath/sftext/"
val intPath = s"$basePath/sfint/"
val doublePath = s"$basePath/sfdouble/"
val arrPath = s"$basePath/sfarray/"
val mapPath = s"$basePath/sfmap/"
val classPath = s"$basePath/sfclass/"
val bytesPath = s"$basePath/sfbytes/"
val boolPath = s"$basePath/sfbool/"
val nullPath = s"$basePath/sfnull/"

/*
* Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
* BooleanWritable and NullWritable
*/
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
(new IntWritable(k), NullWritable.get())
}.saveAsSequenceFile(nullPath)
/*
* Create test data for IntWritable, DoubleWritable, Text, BytesWritable,
* BooleanWritable and NullWritable
*/
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
(new IntWritable(k), NullWritable.get())
}.saveAsSequenceFile(nullPath)

// Create test data for ArrayWritable
val data = Seq(
(1, Array(1.0, 2.0, 3.0)),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
// Create test data for ArrayWritable
val data = Seq(
(1, Array(1.0, 2.0, 3.0)),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
}
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)

// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
(1, Map(2.0 -> "aa")),
(2, Map(3.0 -> "bb")),
(2, Map(1.0 -> "cc")),
(3, Map(2.0 -> "dd")),
(2, Map(1.0 -> "aa")),
(1, Map(3.0 -> "bb"))
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
val k = m.keys.head
val v = m.values.head
mw.put(new DoubleWritable(k), new Text(v))
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)

// Create test data for MapWritable, with keys DoubleWritable and values Text
val mapData = Seq(
(1, Map(2.0 -> "aa")),
(2, Map(3.0 -> "bb")),
(2, Map(1.0 -> "cc")),
(3, Map(2.0 -> "dd")),
(2, Map(1.0 -> "aa")),
(1, Map(3.0 -> "bb"))
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
val k = m.keys.head
val v = m.values.head
mw.put(new DoubleWritable(k), new Text(v))
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)
// Create test data for arbitrary custom writable TestWritable
val testClass = Seq(
("1", TestWritable("test1", 123, 54.0)),
("2", TestWritable("test2", 456, 8762.3)),
("1", TestWritable("test3", 123, 423.1)),
("3", TestWritable("test56", 456, 423.5)),
("2", TestWritable("test2", 123, 5435.2))
)
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[SequenceFileOutputFormat[Text, TestWritable]])
}

// Create test data for arbitrary custom writable TestWritable
val testClass = Seq(
("1", TestWritable("test1", 123, 54.0)),
("2", TestWritable("test2", 456, 8762.3)),
("1", TestWritable("test3", 123, 423.1)),
("3", TestWritable("test56", 456, 423.5)),
("2", TestWritable("test2", 123, 5435.2))
)
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[SequenceFileOutputFormat[Text, TestWritable]])

}

0 comments on commit c0ebfb6

Please sign in to comment.