diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 1668bfcd41a57..96d717cfcc75c 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -142,9 +142,49 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): def _testInputStream(self, test_inputs, numSlices=None): """ +<<<<<<< HEAD This function is only for test. This implementation is inspired by QueStream implementation. Give list of RDD to generate DStream which contains the RDD. +======= + Generate multiple files to make "stream" in Scala side for test. + Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile. + + QueStream maybe good way to implement this function + """ + numSlices = numSlices or self._sc.defaultParallelism + # Calling the Java parallelize() method with an ArrayList is too slow, + # because it sends O(n) Py4J commands. As an alternative, serialized + # objects are written to a file and loaded through textFile(). + + tempFiles = list() + for test_input in test_inputs: + tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + + # Make sure we distribute data evenly if it's smaller than self.batchSize + if "__len__" not in dir(test_input): + test_input = list(test_input) # Make it a list so we can compute its length + batchSize = min(len(test_input) // numSlices, self._sc._batchSize) + if batchSize > 1: + serializer = BatchedSerializer(self._sc._unbatched_serializer, + batchSize) + else: + serializer = self._sc._unbatched_serializer + serializer.dump_stream(test_input, tempFile) + tempFile.close() + tempFiles.append(tempFile.name) + + jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client) + jinput_stream = self._jvm.PythonTestInputStream(self._jssc, + jtempFiles, + numSlices).asJavaDStream() + return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer())) + + def _testInputStream2(self, test_inputs, numSlices=None): + """ + This is inpired by QueStream implementation. Give list of RDD and generate DStream + which contain the RDD. +>>>>>>> broke something """ test_rdds = list() test_rdd_deserializers = list() @@ -156,4 +196,10 @@ def _testInputStream(self, test_inputs, numSlices=None): jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client) jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() +<<<<<<< HEAD return DStream(jinput_stream, self, test_rdd_deserializers[0]) +======= + dstream = DStream(jinput_stream, self, test_rdd_deserializers[0]) + dstream._test_switch_dserializer(test_rdd_deserializers) + return dstream +>>>>>>> broke something diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index f3e80b06a9790..32d6c3e45eb54 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -17,6 +17,7 @@ from collections import defaultdict from itertools import chain, ifilter, imap +import time import operator from pyspark.serializers import NoOpSerializer,\ @@ -428,20 +429,6 @@ def saveAsTextFile(rdd, time): # TODO: implemtnt rightOuterJoin -# TODO: implement groupByKey -# TODO: impelment union -# TODO: implement cache -# TODO: implement persist -# TODO: implement repertitions -# TODO: implement saveAsTextFile -# TODO: implement cogroup -# TODO: implement join -# TODO: implement countByValue -# TODO: implement leftOuterJoin -# TODO: implemtnt rightOuterJoin - - - class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 90ea7b453d401..78e56143fb3ab 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -86,6 +86,16 @@ def main(infile, outfile): (func, deserializer, serializer) = command init_time = time.time() iterator = deserializer.load_stream(infile) + print "deserializer in worker: %s" % str(deserializer) + iterator, walk = itertools.tee(iterator) + if isinstance(walk, int): + print "this is int" + print walk + else: + try: + print list(walk) + except: + print list(walk) serializer.dump_stream(func(split_index, iterator), outfile) except Exception: try: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 133704c115419..63b2f709df7e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -206,3 +206,78 @@ class PythonTransformedDStream( } */ +<<<<<<< HEAD +======= +/** + * This is a input stream just for the unitest. This is equivalent to a checkpointable, + * replayable, reliable message queue like Kafka. It requires a sequence as input, and + * returns the i_th element at the i_th batch under manual clock. + */ +class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int) + extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){ + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + logInfo("Computing RDD for time " + validTime) + inputFiles.foreach(logInfo(_)) + // make a temporary file + // make empty RDD + val prefix = "spark" + val suffix = ".tmp" + val tempFile = File.createTempFile(prefix, suffix) + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + logInfo("Index: " + index) + + val selectedInputFile: String = { + if (inputFiles.isEmpty){ + tempFile.getAbsolutePath + }else if (index < inputFiles.size()) { + inputFiles.get(index) + } else { + tempFile.getAbsolutePath + } + } + val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd + logInfo("Created RDD " + rdd.id + " with " + selectedInputFile) + Some(rdd) + } + + val asJavaDStream = JavaDStream.fromDStream(this) +} + +/** + * This is a input stream just for the unitest. This is equivalent to a checkpointable, + * replayable, reliable message queue like Kafka. It requires a sequence as input, and + * returns the i_th element at the i_th batch under manual clock. + * This implementation is close to QueStream + */ + +class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]]) + extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) { + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]] + val index = ((validTime - zeroTime) / slideDuration - 1).toInt + val selectedRDD = { + if (inputRDDs.isEmpty) { + emptyRDD + } else if (index < inputRDDs.size()) { + inputRDDs.get(index).rdd + } else { + emptyRDD + } + } + + Some(selectedRDD) + } + + val asJavaDStream = JavaDStream.fromDStream(this) +} +>>>>>>> broke something