Skip to content

Commit

Permalink
broke something
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 18, 2014
1 parent 9767712 commit 35933e1
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 4 deletions.
10 changes: 6 additions & 4 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,23 @@ def _testInputStream(self, test_inputs, numSlices=None):
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, PickleSerializer())

return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))

def _testInputStream2(self, test_inputs, numSlices=None):
"""
This is inpired by QueStream implementation. Give list of RDD and generate DStream
which contain the RDD.
"""
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
print test_rdd.glom().collect()
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)

jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream2(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
dstream._test_switch_dserializer(test_rdd_deserializers)
return dstream
20 changes: 20 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from collections import defaultdict
from itertools import chain, ifilter, imap
import time
import operator

from pyspark.serializers import NoOpSerializer,\
Expand Down Expand Up @@ -289,6 +290,25 @@ def get_output(rdd, time):

self.foreachRDD(get_output)

def _test_switch_dserializer(self, serializer_que):
"""
Deserializer is dynamically changed based on numSlice and the number of
input. This function choose deserializer. Currently this is just FIFO.
"""

jrdd_deserializer = self._jrdd_deserializer

def switch(rdd, jtime):
try:
print serializer_que
jrdd_deserializer = serializer_que.pop(0)
print jrdd_deserializer
except Exception as e:
print e

self.foreachRDD(switch)



# TODO: implement groupByKey
# TODO: impelment union
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def test_count(self):
test_input = [[], [1], range(1, 3), range(1, 4), range(1, 5)]

def test_func(dstream):
print "count"
dstream.count().pyprint()
return dstream.count()
expected_output = map(lambda x: [len(x)], test_input)
output = self._run_stream(test_input, test_func, expected_output)
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import time
import socket
import traceback
import itertools
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
from pyspark.accumulators import _accumulatorRegistry
Expand Down Expand Up @@ -76,6 +77,16 @@ def main(infile, outfile):
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
print "deserializer in worker: %s" % str(deserializer)
iterator, walk = itertools.tee(iterator)
if isinstance(walk, int):
print "this is int"
print walk
else:
try:
print list(walk)
except:
print list(walk)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,78 @@ class PythonTransformedDStream(
}
*/

<<<<<<< HEAD
=======
/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
*/
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
logInfo("Computing RDD for time " + validTime)
inputFiles.foreach(logInfo(_))
// make a temporary file
// make empty RDD
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
logInfo("Index: " + index)

val selectedInputFile: String = {
if (inputFiles.isEmpty){
tempFile.getAbsolutePath
}else if (index < inputFiles.size()) {
inputFiles.get(index)
} else {
tempFile.getAbsolutePath
}
}
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is close to QueStream
*/

class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]]
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedRDD = {
if (inputRDDs.isEmpty) {
emptyRDD
} else if (index < inputRDDs.size()) {
inputRDDs.get(index).rdd
} else {
emptyRDD
}
}

Some(selectedRDD)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}
>>>>>>> broke something

0 comments on commit 35933e1

Please sign in to comment.