From d8e51f9dd21cdffb5f8eb1f6312b761529dbcb9b Mon Sep 17 00:00:00 2001 From: Ken Date: Tue, 8 Jul 2014 18:31:41 -0700 Subject: [PATCH 01/69] initial commit for pySparkStreaming --- bin/spark-submit | 6 + core/pom.xml | 2 +- .../apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/deploy/PythonRunner.scala | 1 + .../src/main/python/streaming/wordcount.py | 22 ++ python/pyspark/java_gateway.py | 3 + python/pyspark/streaming/__init__.py | 1 + python/pyspark/streaming/context.py | 133 ++++++++ python/pyspark/streaming/dstream.py | 315 ++++++++++++++++++ python/pyspark/streaming/duration.py | 171 ++++++++++ python/pyspark/streaming/jtime.py | 116 +++++++ python/pyspark/streaming/pyprint.py | 28 ++ python/pyspark/streaming/utils.py | 18 + streaming/pom.xml | 14 +- .../streaming/api/java/JavaDStreamLike.scala | 8 + .../streaming/api/python/PythonDStream.scala | 152 +++++++++ .../spark/streaming/dstream/DStream.scala | 68 +++- 17 files changed, 1050 insertions(+), 10 deletions(-) create mode 100644 examples/src/main/python/streaming/wordcount.py create mode 100644 python/pyspark/streaming/__init__.py create mode 100644 python/pyspark/streaming/context.py create mode 100644 python/pyspark/streaming/dstream.py create mode 100644 python/pyspark/streaming/duration.py create mode 100644 python/pyspark/streaming/jtime.py create mode 100644 python/pyspark/streaming/pyprint.py create mode 100644 python/pyspark/streaming/utils.py create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala diff --git a/bin/spark-submit b/bin/spark-submit index 9e7cecedd0325..ac275b7696d5c 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -37,6 +37,12 @@ done DEPLOY_MODE=${DEPLOY_MODE:-"client"} +# Figure out which Python executable to use +if [[ -z "$PYSPARK_PYTHON" ]]; then + PYSPARK_PYTHON="python" +fi +export PYSPARK_PYTHON + if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY fi diff --git a/core/pom.xml b/core/pom.xml index 8c23842730e37..43633dcb63f54 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.0.0 ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f6570d335757a..e88a54d2086ea 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -252,7 +252,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. */ -private class PairwiseRDD(prev: RDD[Array[Byte]]) extends +private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions override def compute(split: Partition, context: TaskContext) = diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 0d6751f3fa6d2..89f3fd47724fe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -57,6 +57,7 @@ object PythonRunner { val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs) val env = builder.environment() env.put("PYTHONPATH", pythonPath) + env.put("PYSPARK_PYTHON", pythonExec) env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py new file mode 100644 index 0000000000000..f44cd696894ba --- /dev/null +++ b/examples/src/main/python/streaming/wordcount.py @@ -0,0 +1,22 @@ +import sys +from operator import add + +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.duration import * + +if __name__ == "__main__": + if len(sys.argv) != 2: + print >> sys.stderr, "Usage: wordcount " + exit(-1) + ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + + lines = ssc.textFileStream(sys.argv[1]) + fm_lines = lines.flatMap(lambda x: x.split(" ")) + filtered_lines = fm_lines.filter(lambda line: "Spark" in line) + mapped_lines = fm_lines.map(lambda x: (x, 1)) + + fm_lines.pyprint() + filtered_lines.pyprint() + mapped_lines.pyprint() + ssc.start() + ssc.awaitTermination() diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 0dbead4415b02..7038c6422be47 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -82,6 +82,9 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") + java_import(gateway.jvm, "org.apache.spark.streaming.*") + java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") + java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py new file mode 100644 index 0000000000000..719592912e80c --- /dev/null +++ b/python/pyspark/streaming/__init__.py @@ -0,0 +1 @@ +__author__ = 'ktakagiw' diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py new file mode 100644 index 0000000000000..c8ae9c4af85c9 --- /dev/null +++ b/python/pyspark/streaming/context.py @@ -0,0 +1,133 @@ +__author__ = 'ktakagiw' + + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import shutil +import sys +from threading import Lock +from tempfile import NamedTemporaryFile + +from pyspark import accumulators +from pyspark.accumulators import Accumulator +from pyspark.broadcast import Broadcast +from pyspark.conf import SparkConf +from pyspark.files import SparkFiles +from pyspark.java_gateway import launch_gateway +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer +from pyspark.storagelevel import StorageLevel +from pyspark.rdd import RDD +from pyspark.context import SparkContext + +from py4j.java_collections import ListConverter + +from pyspark.streaming.dstream import DStream + +class StreamingContext(object): + """ + Main entry point for Spark functionality. A StreamingContext represents the + connection to a Spark cluster, and can be used to create L{RDD}s and + broadcast variables on that cluster. + """ + + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, + environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, + gateway=None, duration=None): + """ + Create a new StreamingContext. At least the master and app name and duration + should be set, either through the named parameters here or through C{conf}. + + @param master: Cluster URL to connect to + (e.g. mesos://host:port, spark://host:port, local[4]). + @param appName: A name for your job, to display on the cluster web UI. + @param sparkHome: Location where Spark is installed on cluster nodes. + @param pyFiles: Collection of .zip or .py files to send to the cluster + and add to PYTHONPATH. These can be paths on the local file + system or HDFS, HTTP, HTTPS, or FTP URLs. + @param environment: A dictionary of environment variables to set on + worker nodes. + @param batchSize: The number of Python objects represented as a single + Java object. Set 1 to disable batching or -1 to use an + unlimited batch size. + @param serializer: The serializer for RDDs. + @param conf: A L{SparkConf} object setting Spark properties. + @param gateway: Use an existing gateway and JVM, otherwise a new JVM + will be instatiated. + @param duration: A L{Duration} Duration for SparkStreaming + + """ + # Create the Python Sparkcontext + self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, + pyFiles=pyFiles, environment=environment, batchSize=batchSize, + serializer=serializer, conf=conf, gateway=gateway) + self._jvm = self._sc._jvm + self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) + + # Initialize StremaingContext in function to allow subclass specific initialization + def _initialize_context(self, jspark_context, jduration): + return self._jvm.JavaStreamingContext(jspark_context, jduration) + + def actorStream(self, props, name, storageLevel, supervisorStrategy): + raise NotImplementedError + + def addStreamingListener(self, streamingListener): + raise NotImplementedError + + def awaitTermination(self, timeout=None): + if timeout: + self._jssc.awaitTermination(timeout) + else: + self._jssc.awaitTermination() + + def checkpoint(self, directory): + raise NotImplementedError + + def fileStream(self, directory, filter=None, newFilesOnly=None): + raise NotImplementedError + + def networkStream(self, receiver): + raise NotImplementedError + + def queueStream(self, queue, oneAtATime=True, defaultRDD=None): + raise NotImplementedError + + def rawSocketStream(self, hostname, port, storagelevel): + raise NotImplementedError + + def remember(self, duration): + raise NotImplementedError + + def socketStream(hostname, port, converter,storageLevel): + raise NotImplementedError + + def start(self): + self._jssc.start() + + def stop(self, stopSparkContext=True): + raise NotImplementedError + + def textFileStream(self, directory): + return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + + def transform(self, seq): + raise NotImplementedError + + def union(self, seq): + raise NotImplementedError + diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py new file mode 100644 index 0000000000000..b422b147d11e1 --- /dev/null +++ b/python/pyspark/streaming/dstream.py @@ -0,0 +1,315 @@ +from base64 import standard_b64encode as b64enc +import copy +from collections import defaultdict +from collections import namedtuple +from itertools import chain, ifilter, imap +import operator +import os +import sys +import shlex +import traceback +from subprocess import Popen, PIPE +from tempfile import NamedTemporaryFile +from threading import Thread +import warnings +import heapq +from random import Random + +from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ + BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long +from pyspark.join import python_join, python_left_outer_join, \ + python_right_outer_join, python_cogroup +from pyspark.statcounter import StatCounter +from pyspark.rddsampler import RDDSampler +from pyspark.storagelevel import StorageLevel +#from pyspark.resultiterable import ResultIterable +from pyspark.rdd import _JavaStackTrace + +from py4j.java_collections import ListConverter, MapConverter + +__all__ = ["DStream"] + +class DStream(object): + def __init__(self, jdstream, ssc, jrdd_deserializer): + self._jdstream = jdstream + self._ssc = ssc + self.ctx = ssc._sc + self._jrdd_deserializer = jrdd_deserializer + + def generatedRDDs(self): + """ + // RDDs generated, marked as private[streaming] so that testsuites can access it + @transient + """ + pass + + def print_(self): + """ + """ + # print is a resrved name of Python. We cannot give print to function name + getattr(self._jdstream, "print")() + + def pyprint(self): + """ + """ + self._jdstream.pyprint() + + def cache(self): + """ + """ + raise NotImplementedError + + def checkpoint(self): + """ + """ + raise NotImplementedError + + def compute(self, time): + """ + """ + raise NotImplementedError + + def context(self): + """ + """ + raise NotImplementedError + + def count(self): + """ + """ + raise NotImplementedError + + def countByValue(self, numPartitions=None): + """ + """ + raise NotImplementedError + + def countByValueAndWindow(self, duration, slideDuration=None): + """ + """ + raise NotImplementedError + + def countByWindow(self, duration, slideDuration=None): + """ + """ + raise NotImplementedError + + def dstream(self): + """ + """ + raise NotImplementedError + + def filter(self, f): + """ + """ + def func(iterator): return ifilter(f, iterator) + return self.mapPartitions(func) + + def flatMap(self, f, preservesPartitioning=False): + """ + """ + def func(s, iterator): return chain.from_iterable(imap(f, iterator)) + return self.mapPartitionsWithIndex(func, preservesPartitioning) + + def foreachRDD(self, f, time): + """ + """ + raise NotImplementedError + + def glom(self): + """ + """ + raise NotImplementedError + + def map(self, f, preservesPartitioning=False): + """ + """ + def func(split, iterator): return imap(f, iterator) + return PipelinedDStream(self, func, preservesPartitioning) + + def mapPartitions(self, f): + """ + """ + def func(s, iterator): return f(iterator) + return self.mapPartitionsWithIndex(func) + + def perist(self, storageLevel): + """ + """ + raise NotImplementedError + + def reduce(self, func, numPartitions=None): + """ + + """ + return self._combineByKey(lambda x:x, func, func, numPartitions) + + def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, + numPartitions = None): + """ + """ + if numPartitions is None: + numPartitions = self.ctx._defaultParallelism() + def combineLocally(iterator): + combiners = {} + for x in iterator: + (k, v) = x + if k not in combiners: + combiners[k] = createCombiner(v) + else: + combiners[k] = mergeValue(combiners[k], v) + return combiners.iteritems() + locally_combined = self.mapPartitions(combineLocally) + shuffled = locally_combined.partitionBy(numPartitions) + def _mergeCombiners(iterator): + combiners = {} + for (k, v) in iterator: + if not k in combiners: + combiners[k] = v + else: + combiners[k] = mergeCombiners(combiners[k], v) + return combiners.iteritems() + return shuffled.mapPartitions(_mergeCombiners) + + + def partitionBy(self, numPartitions, partitionFunc=None): + """ + Return a copy of the DStream partitioned using the specified partitioner. + + """ + if numPartitions is None: + numPartitions = self.ctx._defaultReducePartitions() + + if partitionFunc is None: + partitionFunc = lambda x: 0 if x is None else hash(x) + # Transferring O(n) objects to Java is too expensive. Instead, we'll + # form the hash buckets in Python, transferring O(numPartitions) objects + # to Java. Each object is a (splitNumber, [objects]) pair. + outputSerializer = self.ctx._unbatched_serializer + def add_shuffle_key(split, iterator): + + buckets = defaultdict(list) + + for (k, v) in iterator: + buckets[partitionFunc(k) % numPartitions].append((k, v)) + for (split, items) in buckets.iteritems(): + yield pack_long(split) + yield outputSerializer.dumps(items) + keyed = PipelinedDStream(self, add_shuffle_key) + keyed._bypass_serializer = True + with _JavaStackTrace(self.ctx) as st: + #JavaDStream + #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD() + pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream() + partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, + id(partitionFunc)) + jdstream = pairDStream.partitionBy(partitioner).values() + dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) + # This is required so that id(partitionFunc) remains unique, even if + # partitionFunc is a lambda: + dstream._partitionFunc = partitionFunc + return dstream + + + + def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): + """ + """ + + raise NotImplementedError + + def repartition(self, numPartitions): + """ + """ + raise NotImplementedError + + def slice(self, fromTime, toTime): + """ + """ + raise NotImplementedError + + def transform(self, transformFunc): + """ + """ + raise NotImplementedError + + def transformWith(self, other, transformFunc): + """ + """ + raise NotImplementedError + + def union(self, that): + """ + """ + raise NotImplementedError + + def window(self, windowDuration, slideDuration=None): + """ + """ + raise NotImplementedError + + def wrapRDD(self, rdd): + """ + """ + raise NotImplementedError + + def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + return PipelinedDStream(self, f, preservesPartitioning) + + +class PipelinedDStream(DStream): + def __init__(self, prev, func, preservesPartitioning=False): + if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable(): + # This transformation is the first in its stage: + self.func = func + self.preservesPartitioning = preservesPartitioning + self._prev_jdstream = prev._jdstream + self._prev_jrdd_deserializer = prev._jrdd_deserializer + else: + prev_func = prev.func + def pipeline_func(split, iterator): + return func(split, prev_func(split, iterator)) + self.func = pipeline_func + self.preservesPartitioning = \ + prev.preservesPartitioning and preservesPartitioning + self._prev_jdstream = prev._prev_jdstream # maintain the pipeline + self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer + self.is_cached = False + self.is_checkpointed = False + self._ssc = prev._ssc + self.ctx = prev.ctx + self.prev = prev + self._jdstream_val = None + self._jrdd_deserializer = self.ctx.serializer + self._bypass_serializer = False + + @property + def _jdstream(self): + if self._jdstream_val: + return self._jdstream_val + if self._bypass_serializer: + serializer = NoOpSerializer() + else: + serializer = self.ctx.serializer + + command = (self.func, self._prev_jrdd_deserializer, serializer) + pickled_command = CloudPickleSerializer().dumps(command) + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + class_tag = self._prev_jdstream.classTag() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + python_dstream = self.ctx._jvm.PythonDStream(self._prev_jdstream.dstream(), + bytearray(pickled_command), + env, includes, self.preservesPartitioning, + self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, + class_tag) + self._jdstream_val = python_dstream.asJavaDStream() + return self._jdstream_val + + def _is_pipelinable(self): + return not (self.is_cached or self.is_checkpointed) diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py new file mode 100644 index 0000000000000..ef1b4f6cef237 --- /dev/null +++ b/python/pyspark/streaming/duration.py @@ -0,0 +1,171 @@ +__author__ = 'ktakagiw' + +from pyspark.streaming import utils + +class Duration(object): + """ + Duration for Spark Streaming application. Used to set duration + + Most of the time, you would create a Duration object with + C{Duration()}, which will load values from C{spark.streaming.*} Java system + properties as well. In this case, any parameters you set directly on + the C{Duration} object take priority over system properties. + + """ + def __init__(self, millis, _jvm=None): + """ + Create new Duration. + + @param millis: milisecond + + """ + self._millis = millis + + from pyspark.context import SparkContext + SparkContext._ensure_initialized() + _jvm = _jvm or SparkContext._jvm + self._jduration = _jvm.Duration(millis) + + def toString(self): + """ Return duration as string """ + return str(self._millis) + " ms" + + def isZero(self): + """ Check if millis is zero """ + return self._millis == 0 + + def prettyPrint(self): + """ + Return a human-readable string representing a duration + """ + return utils.msDurationToString(self._millis) + + def milliseconds(self): + """ Return millisecond """ + return self._millis + + def toFormattedString(self): + """ Return millisecond """ + return str(self._millis) + + def max(self, other): + """ Return higher Duration """ + Duration._is_duration(other) + if self > other: + return self + else: + return other + + def min(self, other): + """ Return lower Durattion """ + Duration._is_duration(other) + if self < other: + return self + else: + return other + + def __str__(self): + return self.toString() + + def __add__(self, other): + """ Add Duration and Duration """ + Duration._is_duration(other) + return Duration(self._millis + other._millis) + + def __sub__(self, other): + """ Subtract Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis - other._millis) + + def __mul__(self, other): + """ Multiple Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis * other._millis) + + def __div__(self, other): + """ + Divide Duration by Duration + for Python 2.X + """ + Duration._is_duration(other) + return Duration(self._millis / other._millis) + + def __truediv__(self, other): + """ + Divide Duration by Duration + for Python 3.0 + """ + Duration._is_duration(other) + return Duration(self._millis / other._millis) + + def __floordiv__(self, other): + """ Divide Duration by Duration """ + Duration._is_duration(other) + return Duration(self._millis // other._millis) + + def __len__(self): + """ Length of miilisecond in Duration """ + return len(self._millis) + + def __lt__(self, other): + """ Duration < Duration """ + Duration._is_duration(other) + return self._millis < other._millis + + def __le__(self, other): + """ Duration <= Duration """ + Duration._is_duration(other) + return self.millis <= other._millis + + def __eq__(self, other): + """ Duration == Duration """ + Duration._is_duration(other) + return self._millis == other._millis + + def __ne__(self, other): + """ Duration != Duration """ + Duration._is_duration(other) + return self._millis != other._millis + + def __gt__(self, other): + """ Duration > Duration """ + Duration._is_duration(other) + return self._millis > other._millis + + def __ge__(self, other): + """ Duration >= Duration """ + Duration._is_duration(other) + return self._millis >= other._millis + + @classmethod + def _is_duration(self, instance): + """ is instance Duration """ + if not isinstance(instance, Duration): + raise TypeError("This should be Duration") + +def Milliseconds(milliseconds): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of milliseconds. + """ + return Duration(milliseconds) + +def Seconds(seconds): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of seconds. + """ + return Duration(seconds * 1000) + +def Minites(minites): + """ + Helper function that creates instance of [[pysparkstreaming.duration]] representing + a given number of minutes. + """ + return Duration(minutes * 60000) + +if __name__ == "__main__": + d = Duration(1) + print d + print d.milliseconds() + diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py new file mode 100644 index 0000000000000..41670af659ea3 --- /dev/null +++ b/python/pyspark/streaming/jtime.py @@ -0,0 +1,116 @@ +__author__ = 'ktakagiw' + +from pyspark.streaming import utils +from pyspark.streaming.duration import Duration + +class Time(object): + """ + Time for Spark Streaming application. Used to set Time + + Most of the time, you would create a Duration object with + C{Time()}, which will load values from C{spark.streaming.*} Java system + properties as well. In this case, any parameters you set directly on + the C{Time} object take priority over system properties. + + """ + def __init__(self, millis, _jvm=None): + """ + Create new Time. + + @param millis: milisecond + + @param _jvm: internal parameter used to pass a handle to the + Java VM; does not need to be set by users + + """ + self._millis = millis + + from pyspark.context import StreamingContext + StreamingContext._ensure_initialized() + _jvm = _jvm or StreamingContext._jvm + self._jtime = _jvm.Time(millis) + + def toString(self): + """ Return time as string """ + return str(self._millis) + " ms" + + def milliseconds(self): + """ Return millisecond """ + return self._millis + + def max(self, other): + """ Return higher Time """ + Time._is_time(other) + if self > other: + return self + else: + return other + + def min(self, other): + """ Return lower Time """ + Time._is_time(other) + if self < other: + return self + else: + return other + + def __add__(self, other): + """ Add Time and Time """ + Duration._is_duration(other) + return Time(self._millis + other._millis) + + def __sub__(self, other): + """ Subtract Time by Duration or Time """ + if isinstance(other, Duration): + return Time(self._millis - other._millis) + elif isinstance(other, Time): + return Duration(self._mills, other._millis) + else: + raise TypeError + + def __lt__(self, other): + """ Time < Time """ + Time._is_time(other) + return self._millis < other._millis + + def __le__(self, other): + """ Time <= Time """ + Time._is_time(other) + return self.millis <= other._millis + + def __eq__(self, other): + """ Time == Time """ + Time._is_time(other) + return self._millis == other._millis + + def __ne__(self, other): + """ Time != Time """ + Time._is_time(other) + return self._millis != other._millis + + def __gt__(self, other): + """ Time > Time """ + Time._is_time(other) + return self._millis > other._millis + + def __ge__(self, other): + """ Time >= Time """ + Time._is_time(other) + return self._millis >= other._millis + + def isMultipbleOf(duration): + """ is multiple by Duration """ + Duration._is_duration(duration) + return self._millis % duration._millis == 0 + + def until(time, interval): + raise NotImplementedError + + def to(time, interval): + raise NotImplementedError + + @classmethod + def _is_time(self, instance): + """ is instance Time """ + if not isinstance(instance, Time): + raise TypeError diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py new file mode 100644 index 0000000000000..fcdaca510812c --- /dev/null +++ b/python/pyspark/streaming/pyprint.py @@ -0,0 +1,28 @@ +import sys +from itertools import chain +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer + +def collect(binary_file_path): + dse = PickleSerializer() + with open(binary_file_path, 'rb') as tempFile: + for item in dse.load_stream(tempFile): + yield item +def main(): + try: + binary_file_path = sys.argv[1] + except: + print "Missed FilePath in argement" + + if not binary_file_path: + return + + counter = 0 + for rdd in chain.from_iterable(collect(binary_file_path)): + print rdd + counter = counter + 1 + if counter >= 10: + print "..." + break + +if __name__ =="__main__": + exit(main()) diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py new file mode 100644 index 0000000000000..71aa3376c6578 --- /dev/null +++ b/python/pyspark/streaming/utils.py @@ -0,0 +1,18 @@ +__author__ = 'ktakagiw' + +def msDurationToString(ms): + """ + Returns a human-readable string representing a duration such as "35ms" + """ + second = 1000 + minute = 60 * second + hour = 60 * minute + + if ms < second: + return "%d ms" % ms + elif ms < minute: + return "%.1f s" % (float(ms) / second) + elif ms < hout: + return "%.1f m" % (float(ms) / minute) + else: + return "%.2f h" % (float(ms) / hour) diff --git a/streaming/pom.xml b/streaming/pom.xml index f506d6ce34a6f..88df63592efee 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.0.0 ../pom.xml @@ -69,14 +69,14 @@ org.scalatest scalatest-maven-plugin - - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index a6184de4e83c1..cfa336df8674f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -54,6 +54,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T dstream.print() } + /** + * Print the first ten elements of each PythonRDD generated in the PythonDStream. This is an output + * operator, so this PythonDStream will be registered as an output stream and there materialized. + * This function is for PythonAPI. + */ + + def pyprint() = dstream.pyprint() + /** * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala new file mode 100644 index 0000000000000..2d8b1e468dc4c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.python + +import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} + +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark._ +import org.apache.spark.util.Utils +import java.io._ +import scala.Some +import org.apache.spark.streaming.Duration +import scala.util.control.Breaks._ +import org.apache.spark.broadcast.Broadcast +import scala.Some +import org.apache.spark.streaming.Duration +import org.apache.spark.rdd.RDD +import org.apache.spark.api.python.PythonRDD + + +import org.apache.spark.streaming.{Duration, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.api.java._ +import org.apache.spark.rdd.RDD +import org.apache.spark.api.python._ +import org.apache.spark.api.python.PairwiseRDD + + +import scala.reflect.ClassTag + + +class PythonDStream[T: ClassTag]( + parent: DStream[T], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]] + ) extends DStream[Array[Byte]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + //pythonDStream compute + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + parent.getOrCompute(validTime) match{ + case Some(rdd) => + val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) + Some(pythonRDD.asJavaRDD.rdd) + case None => None + } + } + val asJavaDStream = JavaDStream.fromDStream(this) + + /** + * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output + * operator, so this PythonDStream will be registered as an output stream and there materialized. + * Since serialized Python object is readable by Python, pyprint writes out binary data to + * temporary file and run python script to deserialized and print the first ten elements + */ + private[streaming] def ppyprint() { + def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => { + val iter = rdd.take(11).iterator + + // make a temporary file + val prefix = "spark" + val suffix = ".tmp" + val tempFile = File.createTempFile(prefix, suffix) + val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) + //write out serialized python object + PythonRDD.writeIteratorToStream(iter, tempFileStream) + tempFileStream.close() + + // This value has to be passed from python + val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") + //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? + //absolute path to the python script is needed to change because we do not use pysparkstreaming + val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath) + val workerEnv = pb.environment() + + //envVars also need to be pass + //workerEnv.putAll(envVars) + val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") + workerEnv.put("PYTHONPATH", pythonPath) + val worker = pb.start() + val is = worker.getInputStream() + val isr = new InputStreamReader(is) + val br = new BufferedReader(isr) + + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + + //print value from python std out + var line = "" + breakable { + while (true) { + line = br.readLine() + if (line == null) break() + println(line) + } + } + //delete temporary file + tempFile.delete() + println() + + } + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + } +} + + +private class PairwiseDStream(prev:DStream[Array[Byte]]) extends +DStream[(Long, Array[Byte])](prev.ssc){ + override def dependencies = List(prev) + + override def slideDuration: Duration = prev.slideDuration + + override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={ + prev.getOrCompute(validTime) match{ + case Some(rdd)=>Some(rdd) + val pairwiseRDD = new PairwiseRDD(rdd) + Some(pairwiseRDD.asJavaPairRDD.rdd) + case None => None + } + } + val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) +} + + + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4709a62381647..ffd7f88fd9dd1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -18,11 +18,13 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import java.io._ import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import scala.util.control.Breaks._ import org.apache.spark.{Logging, SparkException} import org.apache.spark.rdd.{BlockRDD, RDD} @@ -31,6 +33,8 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.Duration +import org.apache.spark.api.python.PythonRDD /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -601,6 +605,68 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } + + + + + /** + * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output + * operator, so this PythonDStream will be registered as an output stream and there materialized. + * Since serialized Python object is readable by Python, pyprint writes out binary data to + * temporary file and run python script to deserialized and print the first ten elements + */ + private[streaming] def pyprint() { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val iter = rdd.take(11).iterator + + // make a temporary file + val prefix = "spark" + val suffix = ".tmp" + val tempFile = File.createTempFile(prefix, suffix) + val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath)) + //write out serialized python object + PythonRDD.writeIteratorToStream(iter, tempFileStream) + tempFileStream.close() + + // This value has to be passed from python + val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") + //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? + //absolute path to the python script is needed to change because we do not use pysparkstreaming + val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath) + val workerEnv = pb.environment() + + //envVars also need to be pass + //workerEnv.putAll(envVars) + val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") + workerEnv.put("PYTHONPATH", pythonPath) + val worker = pb.start() + val is = worker.getInputStream() + val isr = new InputStreamReader(is) + val br = new BufferedReader(isr) + + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + + //print value from python std out + var line = "" + breakable { + while (true) { + line = br.readLine() + if (line == null) break() + println(line) + } + } + //delete temporary file + tempFile.delete() + println() + + } + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + } + + /** * Return a new DStream in which each RDD contains all the elements in seen in a * sliding window of time over this DStream. The new DStream generates RDDs with From 1367be52f80ee55a1b0cb1070b8fb02cf258c0be Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Tue, 15 Jul 2014 15:41:52 -0700 Subject: [PATCH 02/69] comment PythonDStream.PairwiseDStream --- .../apache/spark/streaming/api/python/PythonDStream.scala | 3 ++- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 2d8b1e468dc4c..fe67250604d8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag]( } } - +/* private class PairwiseDStream(prev:DStream[Array[Byte]]) extends DStream[(Long, Array[Byte])](prev.ssc){ override def dependencies = List(prev) @@ -146,6 +146,7 @@ DStream[(Long, Array[Byte])](prev.ssc){ } val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) } +*/ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index b24109074e816..d9d5446b62e9f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -620,10 +620,7 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } - - - - +//TODO move pyprint to PythonDStream /** * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output * operator, so this PythonDStream will be registered as an output stream and there materialized. @@ -644,6 +641,7 @@ abstract class DStream[T: ClassTag] ( tempFileStream.close() // This value has to be passed from python + // Python currently does not do cluster deployment. But what happened val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? From 88068cf8439991b17c244d65af3192b49968583f Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Tue, 15 Jul 2014 17:19:20 -0700 Subject: [PATCH 03/69] modify dstream.py to fix indent error --- python/pyspark/streaming/dstream.py | 2 +- .../org/apache/spark/streaming/api/python/PythonDStream.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index b422b147d11e1..a512517f6e437 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -172,7 +172,7 @@ def _mergeCombiners(iterator): return shuffled.mapPartitions(_mergeCombiners) - def partitionBy(self, numPartitions, partitionFunc=None): + def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index fe67250604d8e..389136f9e21a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -91,7 +91,7 @@ class PythonDStream[T: ClassTag]( tempFileStream.close() // This value has to be passed from python - val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") + //val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? //absolute path to the python script is needed to change because we do not use pysparkstreaming From 94a07879007d6e6157b7f5b59a04284996f5623f Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Tue, 15 Jul 2014 21:08:43 -0700 Subject: [PATCH 04/69] added reducedByKey not working yet --- .../src/main/python/streaming/wordcount.py | 10 ++++++- python/pyspark/streaming/dstream.py | 27 +++++++++++++++++-- .../streaming/api/python/PythonDStream.scala | 6 ++--- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index f44cd696894ba..3996991109d60 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -1,6 +1,7 @@ import sys from operator import add +from pyspark.conf import SparkConf from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -8,15 +9,22 @@ if len(sys.argv) != 2: print >> sys.stderr, "Usage: wordcount " exit(-1) - ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + conf = SparkConf() + conf.setAppName("PythonStreamingWordCount") + conf.set("spark.default.parallelism", 1) + +# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) + ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.textFileStream(sys.argv[1]) fm_lines = lines.flatMap(lambda x: x.split(" ")) filtered_lines = fm_lines.filter(lambda line: "Spark" in line) mapped_lines = fm_lines.map(lambda x: (x, 1)) + reduced_lines = mapped_lines.reduce(add) fm_lines.pyprint() filtered_lines.pyprint() mapped_lines.pyprint() + reduced_lines.pyprint() ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a512517f6e437..e144f8bc1cc09 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -29,6 +29,7 @@ __all__ = ["DStream"] + class DStream(object): def __init__(self, jdstream, ssc, jrdd_deserializer): self._jdstream = jdstream @@ -149,7 +150,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, """ """ if numPartitions is None: - numPartitions = self.ctx._defaultParallelism() + numPartitions = self._defaultReducePartitions() def combineLocally(iterator): combiners = {} for x in iterator: @@ -211,7 +212,6 @@ def add_shuffle_key(split, iterator): return dstream - def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): """ """ @@ -254,8 +254,31 @@ def wrapRDD(self, rdd): raise NotImplementedError def mapPartitionsWithIndex(self, f, preservesPartitioning=False): + """ + + """ return PipelinedDStream(self, f, preservesPartitioning) + def _defaultReducePartitions(self): + """ + + """ + # hard code to avoid the error + return 2 + if self.ctx._conf.contains("spark.default.parallelism"): + return self.ctx.defaultParallelism + else: + return self.getNumPartitions() + + def getNumPartitions(self): + """ + Returns the number of partitions in RDD + >>> rdd = sc.parallelize([1, 2, 3, 4], 2) + >>> rdd.getNumPartitions() + 2 + """ + return self._jdstream.partitions().size() + class PipelinedDStream(DStream): def __init__(self, prev, func, preservesPartitioning=False): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 389136f9e21a0..719dd0a6a53c2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag]( } } -/* + private class PairwiseDStream(prev:DStream[Array[Byte]]) extends DStream[(Long, Array[Byte])](prev.ssc){ override def dependencies = List(prev) @@ -144,9 +144,9 @@ DStream[(Long, Array[Byte])](prev.ssc){ case None => None } } - val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) + val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) } -*/ + From 69e9cd33a58b880f96cc9c3e5e62eaa415c49843 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:07:42 -0700 Subject: [PATCH 05/69] implementing transform function in Python --- python/pyspark/mllib/_common.py | 2 +- python/pyspark/streaming/dstream.py | 3 +- .../api/python/PythonTransformedDStream.scala | 37 +++++++++++++++++++ .../spark/streaming/dstream/DStream.scala | 3 ++ 4 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index e609b60a0f968..4b723693f43e3 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -164,7 +164,7 @@ def _deserialize_double_vector(ba, offset=0): nb = len(ba) - offset if nb < 5: raise TypeError("_deserialize_double_vector called on a %d-byte array, " - "which is too short" % nb) + "which is too short" % nb) if ba[offset] == DENSE_VECTOR_MAGIC: return _deserialize_dense_vector(ba, offset) elif ba[offset] == SPARSE_VECTOR_MAGIC: diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index e144f8bc1cc09..3365c6d69c1a2 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -172,7 +172,6 @@ def _mergeCombiners(iterator): return combiners.iteritems() return shuffled.mapPartitions(_mergeCombiners) - def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. @@ -231,6 +230,7 @@ def slice(self, fromTime, toTime): def transform(self, transformFunc): """ """ + self._jdstream.transform(transformFunc) raise NotImplementedError def transformWith(self, other, transformFunc): @@ -264,7 +264,6 @@ def _defaultReducePartitions(self): """ # hard code to avoid the error - return 2 if self.ctx._conf.contains("spark.default.parallelism"): return self.ctx.defaultParallelism else: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala new file mode 100644 index 0000000000000..ff70483b771a4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala @@ -0,0 +1,37 @@ +package org.apache.spark.streaming.api.python + +import org.apache.spark.Accumulator +import org.apache.spark.api.python.PythonRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.{Time, Duration} +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * Created by ken on 7/15/14. + */ +class PythonTransformedDStream[T: ClassTag]( + parents: Seq[DStream[T]], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitoning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[Array[Byte]]], + accumulator: Accumulator[JList[Array[Byte]]] + ) extends DStream[Array[Byte]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + //pythonDStream compute + override def compute(validTime: Time): Option[RDD[Array[Byte]]] = { + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some() + } + val asJavaDStream = JavaDStream.fromDStream(this) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d9d5446b62e9f..67977244ef420 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -561,9 +561,12 @@ abstract class DStream[T: ClassTag] ( // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean + + // serialized python val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) + // if transformfunc is fine, it is okay cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc) From 72bfc66074b2f35224f116759e0a47204a138f24 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:12:53 -0700 Subject: [PATCH 06/69] modified the code base on comment in https://github.com/tdas/spark/pull/10 --- core/pom.xml | 2 +- python/pyspark/streaming/__init__.py | 1 - python/pyspark/streaming/context.py | 5 +---- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index a59fc9fc035d7..6abf8480d5da0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.1.0-SNAPSHOT ../pom.xml diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index 719592912e80c..e69de29bb2d1d 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -1 +0,0 @@ -__author__ = 'ktakagiw' diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index c8ae9c4af85c9..40e9d98942e2e 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -1,6 +1,3 @@ -__author__ = 'ktakagiw' - - # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -41,7 +38,7 @@ class StreamingContext(object): """ - Main entry point for Spark functionality. A StreamingContext represents the + Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create L{RDD}s and broadcast variables on that cluster. """ From a7a0b5ce72e9bad14880f2285544d11d725f0f14 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:17:02 -0700 Subject: [PATCH 07/69] add coment for hack why PYSPARK_PYTHON is needed in spark-submit --- bin/spark-submit | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/bin/spark-submit b/bin/spark-submit index ac275b7696d5c..fa022f707e572 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -37,6 +37,16 @@ done DEPLOY_MODE=${DEPLOY_MODE:-"client"} + +# This is a hack to make DStream.pyprint work. +# This will be removed after pyprint is moved to PythonDStream. +# Problem is that print function is in (Scala)DStream. +# Whenever python code is executed, we call PythonDStream which passes +# pythonExec(which python Spark should execute). +# Since pyprint is located in DStream, Spark does not know which python should use. +# In that case, get python path from PYSPARK_PYTHON, environmental variable. +# This fix is ongoing in print branch in my repo. + # Figure out which Python executable to use if [[ -z "$PYSPARK_PYTHON" ]]; then PYSPARK_PYTHON="python" From 0a516f5a31bfb5f5d3ac58139af820ad8bb50a5a Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:19:13 -0700 Subject: [PATCH 08/69] add coment for hack why PYSPARK_PYTHON is needed in spark-submit --- bin/spark-submit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/spark-submit b/bin/spark-submit index fa022f707e572..ec4e10787cff0 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -45,7 +45,7 @@ DEPLOY_MODE=${DEPLOY_MODE:-"client"} # pythonExec(which python Spark should execute). # Since pyprint is located in DStream, Spark does not know which python should use. # In that case, get python path from PYSPARK_PYTHON, environmental variable. -# This fix is ongoing in print branch in my repo. +# This fix is ongoing in print branch in https://github.com/giwa/spark/tree/print. # Figure out which Python executable to use if [[ -z "$PYSPARK_PYTHON" ]]; then From 57e3e52191464f6b8f8ec53a6452dcf86d4704a6 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:24:08 -0700 Subject: [PATCH 09/69] remove not implemented DStream functions in python --- python/pyspark/streaming/dstream.py | 102 ---------------------------- 1 file changed, 102 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a512517f6e437..6ab9c500450aa 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -54,50 +54,6 @@ def pyprint(self): """ self._jdstream.pyprint() - def cache(self): - """ - """ - raise NotImplementedError - - def checkpoint(self): - """ - """ - raise NotImplementedError - - def compute(self, time): - """ - """ - raise NotImplementedError - - def context(self): - """ - """ - raise NotImplementedError - - def count(self): - """ - """ - raise NotImplementedError - - def countByValue(self, numPartitions=None): - """ - """ - raise NotImplementedError - - def countByValueAndWindow(self, duration, slideDuration=None): - """ - """ - raise NotImplementedError - - def countByWindow(self, duration, slideDuration=None): - """ - """ - raise NotImplementedError - - def dstream(self): - """ - """ - raise NotImplementedError def filter(self, f): """ @@ -111,16 +67,6 @@ def flatMap(self, f, preservesPartitioning=False): def func(s, iterator): return chain.from_iterable(imap(f, iterator)) return self.mapPartitionsWithIndex(func, preservesPartitioning) - def foreachRDD(self, f, time): - """ - """ - raise NotImplementedError - - def glom(self): - """ - """ - raise NotImplementedError - def map(self, f, preservesPartitioning=False): """ """ @@ -133,11 +79,6 @@ def mapPartitions(self, f): def func(s, iterator): return f(iterator) return self.mapPartitionsWithIndex(func) - def perist(self, storageLevel): - """ - """ - raise NotImplementedError - def reduce(self, func, numPartitions=None): """ @@ -210,49 +151,6 @@ def add_shuffle_key(split, iterator): dstream._partitionFunc = partitionFunc return dstream - - - def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): - """ - """ - - raise NotImplementedError - - def repartition(self, numPartitions): - """ - """ - raise NotImplementedError - - def slice(self, fromTime, toTime): - """ - """ - raise NotImplementedError - - def transform(self, transformFunc): - """ - """ - raise NotImplementedError - - def transformWith(self, other, transformFunc): - """ - """ - raise NotImplementedError - - def union(self, that): - """ - """ - raise NotImplementedError - - def window(self, windowDuration, slideDuration=None): - """ - """ - raise NotImplementedError - - def wrapRDD(self, rdd): - """ - """ - raise NotImplementedError - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): return PipelinedDStream(self, f, preservesPartitioning) From c9d79dd381ee001eb5920ca865b5dc72f8b46a7f Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:35:59 -0700 Subject: [PATCH 10/69] revert pom.xml --- python/pyspark/streaming/pyprint.py | 2 +- streaming/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py index fcdaca510812c..6e87c985a57e3 100644 --- a/python/pyspark/streaming/pyprint.py +++ b/python/pyspark/streaming/pyprint.py @@ -1,6 +1,6 @@ import sys from itertools import chain -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer +from pyspark.serializers import PickleSerializer def collect(binary_file_path): dse = PickleSerializer() diff --git a/streaming/pom.xml b/streaming/pom.xml index 88df63592efee..2239ad9c8579c 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.0.0 + 1.1.0-SNAPSHOT ../pom.xml From 8f8202b5c9bfccfb42f7027e7e8079b4b5807f02 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:38:26 -0700 Subject: [PATCH 11/69] revert streaming pom.xml --- streaming/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/pom.xml b/streaming/pom.xml index 2239ad9c8579c..03102c5e836bf 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -76,7 +76,7 @@ are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally, 'mvn compile' should not compile test classes and therefore should not need this. However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559) - causes the compilation to fail if streaming test-jar is not generated. Hence, the + causes the compilation to fail if streaming test-jar is not generated. Hence, the second execution profile for 'mvn compile'. --> From fa4a7fc1b0643bfbe48b24e3897d65bce3332e64 Mon Sep 17 00:00:00 2001 From: Ken Takagiwa Date: Wed, 16 Jul 2014 11:44:14 -0700 Subject: [PATCH 12/69] revert streaming/pom.xml --- streaming/pom.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/pom.xml b/streaming/pom.xml index 03102c5e836bf..f506d6ce34a6f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -69,12 +69,12 @@ org.scalatest scalatest-maven-plugin - -