diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index c4a1014ab9ab0..88e0cbbede1be 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -16,11 +16,10 @@ # from pyspark import RDD -from pyspark.serializers import UTF8Deserializer, BatchedSerializer +from pyspark.serializers import UTF8Deserializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream -from pyspark.streaming.duration import Seconds from py4j.java_collections import ListConverter @@ -76,9 +75,6 @@ def __init__(self, sparkContext, duration): @param duration: A L{Duration} object or seconds for SparkStreaming. """ - if isinstance(duration, (int, long, float)): - duration = Seconds(duration) - self._sc = sparkContext self._jvm = self._sc._jvm self._start_callback_server() @@ -93,7 +89,10 @@ def _start_callback_server(self): gw._python_proxy_port = gw._callback_server.port # update port with real port def _initialize_context(self, sc, duration): - return self._jvm.JavaStreamingContext(sc._jsc, duration._jduration) + return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) + + def _jduration(self, seconds): + return self._jvm.Duration(int(seconds * 1000)) @property def sparkContext(self): @@ -111,12 +110,12 @@ def start(self): def awaitTermination(self, timeout=None): """ Wait for the execution to stop. - @param timeout: time to wait in milliseconds + @param timeout: time to wait in seconds """ if timeout is None: self._jssc.awaitTermination() else: - self._jssc.awaitTermination(timeout) + self._jssc.awaitTermination(int(timeout * 1000)) def stop(self, stopSparkContext=True, stopGraceFully=False): """ @@ -139,10 +138,7 @@ def remember(self, duration): @param duration Minimum duration (in seconds) that each DStream should remember its RDDs """ - if isinstance(duration, (int, long, float)): - duration = Seconds(duration) - - self._jssc.remember(duration._jduration) + self._jssc.remember(self._jduration(duration)) def checkpoint(self, directory): """ diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 9dd3556327477..8c79eece773ce 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -22,7 +22,6 @@ from pyspark.storagelevel import StorageLevel from pyspark.streaming.util import rddToFileName, RDDFunction, RDDFunction2 from pyspark.rdd import portable_hash -from pyspark.streaming.duration import Duration, Seconds from pyspark.resultiterable import ResultIterable __all__ = ["DStream"] @@ -334,10 +333,10 @@ def slice(self, begin, end): return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds] def window(self, windowDuration, slideDuration=None): - d = Seconds(windowDuration) + d = self._ssc._jduration(windowDuration) if slideDuration is None: return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) - s = Seconds(slideDuration) + s = self._ssc._jduration(slideDuration) return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): @@ -375,16 +374,12 @@ def invReduceFunc(a, b, t): joined = a.leftOuterJoin(b, numPartitions) return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) - if not isinstance(windowDuration, Duration): - windowDuration = Seconds(windowDuration) - if not isinstance(slideDuration, Duration): - slideDuration = Seconds(slideDuration) jreduceFunc = RDDFunction2(self.ctx, reduceFunc, reduced._jrdd_deserializer) jinvReduceFunc = RDDFunction2(self.ctx, invReduceFunc, reduced._jrdd_deserializer) dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), jreduceFunc, jinvReduceFunc, - windowDuration._jduration, - slideDuration._jduration) + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) return DStream(dstream.asJavaDStream(), self._ssc, self.ctx.serializer) def updateStateByKey(self, updateFunc, numPartitions=None): diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py deleted file mode 100644 index 8660f332a48da..0000000000000 --- a/python/pyspark/streaming/duration.py +++ /dev/null @@ -1,401 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -def msDurationToString(ms): - """ - Returns a human-readable string representing a duration such as "35ms" - - >> msDurationToString(10) - '10 ms' - >>> msDurationToString(1000) - '1.0 s' - >>> msDurationToString(60000) - '1.0 m' - >>> msDurationToString(3600000) - '1.00 h' - """ - second = 1000 - minute = 60 * second - hour = 60 * minute - - if ms < second: - return "%d ms" % ms - elif ms < minute: - return "%.1f s" % (float(ms) / second) - elif ms < hour: - return "%.1f m" % (float(ms) / minute) - else: - return "%.2f h" % (float(ms) / hour) - - -class Duration(object): - """ - Duration for Spark Streaming application. Used to set duration - - Most of the time, you would create a Duration object with - C{Duration()}, which will load values from C{spark.streaming.*} Java system - properties as well. In this case, any parameters you set directly on - the C{Duration} object take priority over system properties. - - """ - def __init__(self, millis, _jvm=None): - """ - Create new Duration. - - @param millis: milisecond - - """ - self._millis = millis - - from pyspark.context import SparkContext - SparkContext._ensure_initialized() - _jvm = _jvm or SparkContext._jvm - self._jduration = _jvm.Duration(millis) - - def toString(self): - """ - Return duration as string - - >>> d_10 = Duration(10) - >>> d_10.toString() - '10 ms' - """ - return str(self._millis) + " ms" - - def isZero(self): - """ - Check if millis is zero - - >>> d_10 = Duration(10) - >>> d_10.isZero() - False - >>> d_0 = Duration(0) - >>> d_0.isZero() - True - """ - return self._millis == 0 - - def prettyPrint(self): - """ - Return a human-readable string representing a duration - - >>> d_10 = Duration(10) - >>> d_10.prettyPrint() - '10 ms' - >>> d_1sec = Duration(1000) - >>> d_1sec.prettyPrint() - '1.0 s' - >>> d_1min = Duration(60 * 1000) - >>> d_1min.prettyPrint() - '1.0 m' - >>> d_1hour = Duration(60 * 60 * 1000) - >>> d_1hour.prettyPrint() - '1.00 h' - """ - return msDurationToString(self._millis) - - def milliseconds(self): - """ - Return millisecond - - >>> d_10 = Duration(10) - >>> d_10.milliseconds() - 10 - - """ - return self._millis - - def toFormattedString(self): - """ - Return millisecond - - >>> d_10 = Duration(10) - >>> d_10.toFormattedString() - '10' - - """ - return str(self._millis) - - def max(self, other): - """ - Return higher Duration - - >>> d_10 = Duration(10) - >>> d_100 = Duration(100) - >>> d_max = d_10.max(d_100) - >>> print d_max - 100 ms - - """ - Duration._is_duration(other) - if self > other: - return self - else: - return other - - def min(self, other): - """ - Return lower Durattion - - >>> d_10 = Duration(10) - >>> d_100 = Duration(100) - >>> d_min = d_10.min(d_100) - >>> print d_min - 10 ms - - """ - Duration._is_duration(other) - if self < other: - return self - else: - return other - - def __str__(self): - """ - >>> d_10 = Duration(10) - >>> str(d_10) - '10 ms' - - """ - return self.toString() - - def __add__(self, other): - """ - Add Duration and Duration - - >>> d_10 = Duration(10) - >>> d_100 = Duration(100) - >>> d_110 = d_10 + d_100 - >>> print d_110 - 110 ms - """ - Duration._is_duration(other) - return Duration(self._millis + other._millis) - - def __sub__(self, other): - """ - Subtract Duration by Duration - - >>> d_10 = Duration(10) - >>> d_100 = Duration(100) - >>> d_90 = d_100 - d_10 - >>> print d_90 - 90 ms - - """ - Duration._is_duration(other) - return Duration(self._millis - other._millis) - - def __mul__(self, other): - """ - Multiple Duration by Duration - - >>> d_10 = Duration(10) - >>> d_100 = Duration(100) - >>> d_1000 = d_10 * d_100 - >>> print d_1000 - 1000 ms - - """ - Duration._is_duration(other) - return Duration(self._millis * other._millis) - - def __div__(self, other): - """ - Divide Duration by Duration - for Python 2.X - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_2 = d_20 / d_10 - >>> print d_2 - 2 ms - - """ - Duration._is_duration(other) - return Duration(self._millis / other._millis) - - def __truediv__(self, other): - """ - Divide Duration by Duration - for Python 3.0 - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_2 = d_20 / d_10 - >>> print d_2 - 2 ms - - """ - Duration._is_duration(other) - return Duration(self._millis / other._millis) - - def __floordiv__(self, other): - """ - Divide Duration by Duration - - >>> d_10 = Duration(10) - >>> d_3 = Duration(3) - >>> d_3 = d_10 // d_3 - >>> print d_3 - 3 ms - - """ - Duration._is_duration(other) - return Duration(self._millis // other._millis) - - def __lt__(self, other): - """ - Duration < Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 < d_20 - True - >>> d_20 < d_10 - False - - """ - Duration._is_duration(other) - return self._millis < other._millis - - def __le__(self, other): - """ - Duration <= Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 <= d_20 - True - >>> d_20 <= d_10 - False - - """ - Duration._is_duration(other) - return self._millis <= other._millis - - def __eq__(self, other): - """ - Duration == Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 == d_20 - False - >>> other_d_10 = Duration(10) - >>> d_10 == other_d_10 - True - - """ - Duration._is_duration(other) - return self._millis == other._millis - - def __ne__(self, other): - """ - Duration != Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 != d_20 - True - >>> other_d_10 = Duration(10) - >>> d_10 != other_d_10 - False - - """ - Duration._is_duration(other) - return self._millis != other._millis - - def __gt__(self, other): - """ - Duration > Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 > d_20 - False - >>> d_20 > d_10 - True - - """ - Duration._is_duration(other) - return self._millis > other._millis - - def __ge__(self, other): - """ - Duration >= Duration - - >>> d_10 = Duration(10) - >>> d_20 = Duration(20) - >>> d_10 < d_20 - True - >>> d_20 < d_10 - False - - - """ - Duration._is_duration(other) - return self._millis >= other._millis - - @classmethod - def _is_duration(self, instance): - """ is instance Duration """ - if not isinstance(instance, Duration): - raise TypeError("This should be Duration") - - -def Milliseconds(milliseconds): - """ - Helper function that creates instance of [[pysparkstreaming.duration]] representing - a given number of milliseconds. - - >>> milliseconds = Milliseconds(1) - >>> d_1 = Duration(1) - >>> milliseconds == d_1 - True - - """ - return Duration(milliseconds) - - -def Seconds(seconds): - """ - Helper function that creates instance of [[pysparkstreaming.duration]] representing - a given number of seconds. - - >>> seconds = Seconds(1) - >>> d_1sec = Duration(1000) - >>> seconds == d_1sec - True - - """ - return Duration(seconds * 1000) - - -def Minutes(minutes): - """ - Helper function that creates instance of [[pysparkstreaming.duration]] representing - a given number of minutes. - - >>> minutes = Minutes(1) - >>> d_1min = Duration(60 * 1000) - >>> minutes == d_1min - True - - """ - return Duration(minutes * 60 * 1000) diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py deleted file mode 100644 index e157640afa4df..0000000000000 --- a/python/pyspark/streaming/jtime.py +++ /dev/null @@ -1,135 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from pyspark.streaming.duration import Duration - -""" -The name of this file, time is not a good naming for python -because if we do import time when we want to use native python time package, it does -not import python time package. -""" -# TODO: add doctest - - -class Time(object): - """ - Time for Spark Streaming application. Used to set Time - - Most of the time, you would create a Duration object with - C{Time()}, which will load values from C{spark.streaming.*} Java system - properties as well. In this case, any parameters you set directly on - the C{Time} object take priority over system properties. - - """ - def __init__(self, millis, _jvm=None): - """ - Create new Time. - - @param millis: milisecond - - @param _jvm: internal parameter used to pass a handle to the - Java VM; does not need to be set by users - - """ - self._millis = millis - - from pyspark.context import StreamingContext - StreamingContext._ensure_initialized() - _jvm = _jvm or StreamingContext._jvm - self._jtime = _jvm.Time(millis) - - def toString(self): - """ Return time as string """ - return str(self._millis) + " ms" - - def milliseconds(self): - """ Return millisecond """ - return self._millis - - def max(self, other): - """ Return higher Time """ - Time._is_time(other) - if self > other: - return self - else: - return other - - def min(self, other): - """ Return lower Time """ - Time._is_time(other) - if self < other: - return self - else: - return other - - def __add__(self, other): - """ Add Time and Time """ - Duration._is_duration(other) - return Time(self._millis + other._millis) - - def __sub__(self, other): - """ Subtract Time by Duration or Time """ - if isinstance(other, Duration): - return Time(self._millis - other._millis) - elif isinstance(other, Time): - return Duration(self._millis, other._millis) - else: - raise TypeError - - def __lt__(self, other): - """ Time < Time """ - Time._is_time(other) - return self._millis < other._millis - - def __le__(self, other): - """ Time <= Time """ - Time._is_time(other) - return self._millis <= other._millis - - def __eq__(self, other): - """ Time == Time """ - Time._is_time(other) - return self._millis == other._millis - - def __ne__(self, other): - """ Time != Time """ - Time._is_time(other) - return self._millis != other._millis - - def __gt__(self, other): - """ Time > Time """ - Time._is_time(other) - return self._millis > other._millis - - def __ge__(self, other): - """ Time >= Time """ - Time._is_time(other) - return self._millis >= other._millis - - def isMultipbleOf(self, duration): - """ is multiple by Duration """ - Duration._is_duration(duration) - return self._millis % duration._millis == 0 - - @classmethod - def _is_time(self, instance): - """ is instance Time """ - if not isinstance(instance, Time): - raise TypeError - -# TODO: implement until -# TODO: implement to diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a585bbfa06f5b..1684da580f973 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -41,7 +41,7 @@ def setUp(self): self.sc = SparkContext(appName=class_name) self.sc.setCheckpointDir("/tmp") # TODO: decrease duration to speed up tests - self.ssc = StreamingContext(self.sc, duration=Seconds(1)) + self.ssc = StreamingContext(self.sc, duration=1) def tearDown(self): self.ssc.stop() @@ -315,7 +315,7 @@ def func(dstream): class TestStreamingContext(unittest.TestCase): def setUp(self): self.sc = SparkContext(master="local[2]", appName=self.__class__.__name__) - self.batachDuration = Seconds(0.1) + self.batachDuration = 0.1 self.ssc = StreamingContext(self.sc, self.batachDuration) def tearDown(self):