diff --git a/bin/spark-submit b/bin/spark-submit index 37e973a50b6fa..c557311b4b20e 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -48,7 +48,6 @@ export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PR # paths, library paths, java options and memory early on. Otherwise, it will # be too late by the time the driver JVM has started. -<<<<<<< HEAD if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then # Parse the properties file only if the special configs exist contains_special_configs=$( @@ -58,16 +57,6 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI if [ -n "$contains_special_configs" ]; then export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 fi -======= -# Figure out which Python executable to use -if [[ -z "$PYSPARK_PYTHON" ]]; then - PYSPARK_PYTHON="python" -fi -export PYSPARK_PYTHON - -if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then - export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY ->>>>>>> initial commit for pySparkStreaming fi exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" diff --git a/core/pom.xml b/core/pom.xml index 7eb0b48eaeebd..2a81f6df289c0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,11 +21,7 @@ org.apache.spark spark-parent -<<<<<<< HEAD 1.2.0-SNAPSHOT -======= - 1.0.0 ->>>>>>> initial commit for pySparkStreaming ../pom.xml diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index c19eb74c44ed6..2426345711086 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -1,10 +1,7 @@ import sys from operator import add -<<<<<<< HEAD from pyspark.conf import SparkConf -======= ->>>>>>> initial commit for pySparkStreaming from pyspark.streaming.context import StreamingContext from pyspark.streaming.duration import * @@ -12,7 +9,6 @@ if len(sys.argv) != 2: print >> sys.stderr, "Usage: wordcount " exit(-1) -<<<<<<< HEAD conf = SparkConf() conf.setAppName("PythonStreamingWordCount") @@ -24,17 +20,5 @@ count = mapped_words.reduceByKey(add) count.pyprint() -======= - ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) - - lines = ssc.textFileStream(sys.argv[1]) - fm_lines = lines.flatMap(lambda x: x.split(" ")) - filtered_lines = fm_lines.filter(lambda line: "Spark" in line) - mapped_lines = fm_lines.map(lambda x: (x, 1)) - - fm_lines.pyprint() - filtered_lines.pyprint() - mapped_lines.pyprint() ->>>>>>> initial commit for pySparkStreaming ssc.start() ssc.awaitTermination() diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9b18696213691..f3c6d231ab777 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -108,16 +108,10 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") -<<<<<<< HEAD java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this? java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this? -======= - java_import(gateway.jvm, "org.apache.spark.streaming.*") - java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*") - java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*") ->>>>>>> initial commit for pySparkStreaming java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index e65d2243ca7d5..3f455a3e06072 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -1,9 +1,3 @@ -<<<<<<< HEAD -======= -__author__ = 'ktakagiw' - - ->>>>>>> initial commit for pySparkStreaming # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -21,7 +15,6 @@ # limitations under the License. # -<<<<<<< HEAD import sys from signal import signal, SIGTERM, SIGINT @@ -36,43 +29,12 @@ class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create L{DStream}s and -======= -import os -import shutil -import sys -from threading import Lock -from tempfile import NamedTemporaryFile - -from pyspark import accumulators -from pyspark.accumulators import Accumulator -from pyspark.broadcast import Broadcast -from pyspark.conf import SparkConf -from pyspark.files import SparkFiles -from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer -from pyspark.storagelevel import StorageLevel -from pyspark.rdd import RDD -from pyspark.context import SparkContext - -from py4j.java_collections import ListConverter - -from pyspark.streaming.dstream import DStream - -class StreamingContext(object): - """ - Main entry point for Spark functionality. A StreamingContext represents the - connection to a Spark cluster, and can be used to create L{RDD}s and ->>>>>>> initial commit for pySparkStreaming broadcast variables on that cluster. """ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, -<<<<<<< HEAD gateway=None, sparkContext=None, duration=None): -======= - gateway=None, duration=None): ->>>>>>> initial commit for pySparkStreaming """ Create a new StreamingContext. At least the master and app name and duration should be set, either through the named parameters here or through C{conf}. @@ -93,7 +55,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, @param conf: A L{SparkConf} object setting Spark properties. @param gateway: Use an existing gateway and JVM, otherwise a new JVM will be instatiated. -<<<<<<< HEAD @param sparkContext: L{SparkContext} object. @param duration: A L{Duration} object for SparkStreaming. @@ -112,15 +73,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, # is started in StreamingContext. SparkContext._gateway.restart_callback_server() self._clean_up_trigger() -======= - @param duration: A L{Duration} Duration for SparkStreaming - - """ - # Create the Python Sparkcontext - self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, - pyFiles=pyFiles, environment=environment, batchSize=batchSize, - serializer=serializer, conf=conf, gateway=gateway) ->>>>>>> initial commit for pySparkStreaming self._jvm = self._sc._jvm self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) @@ -128,7 +80,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, def _initialize_context(self, jspark_context, jduration): return self._jvm.JavaStreamingContext(jspark_context, jduration) -<<<<<<< HEAD def _clean_up_trigger(self): """Kill py4j callback server properly using signal lib""" @@ -205,53 +156,3 @@ def _testInputStream(self, test_inputs, numSlices=None): jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream() return DStream(jinput_stream, self, test_rdd_deserializers[0]) -======= - def actorStream(self, props, name, storageLevel, supervisorStrategy): - raise NotImplementedError - - def addStreamingListener(self, streamingListener): - raise NotImplementedError - - def awaitTermination(self, timeout=None): - if timeout: - self._jssc.awaitTermination(timeout) - else: - self._jssc.awaitTermination() - - def checkpoint(self, directory): - raise NotImplementedError - - def fileStream(self, directory, filter=None, newFilesOnly=None): - raise NotImplementedError - - def networkStream(self, receiver): - raise NotImplementedError - - def queueStream(self, queue, oneAtATime=True, defaultRDD=None): - raise NotImplementedError - - def rawSocketStream(self, hostname, port, storagelevel): - raise NotImplementedError - - def remember(self, duration): - raise NotImplementedError - - def socketStream(hostname, port, converter,storageLevel): - raise NotImplementedError - - def start(self): - self._jssc.start() - - def stop(self, stopSparkContext=True): - raise NotImplementedError - - def textFileStream(self, directory): - return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) - - def transform(self, seq): - raise NotImplementedError - - def union(self, seq): - raise NotImplementedError - ->>>>>>> initial commit for pySparkStreaming diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index eeb6b5644d1d3..224d2bbdeeb53 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -1,4 +1,3 @@ -<<<<<<< HEAD # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -27,50 +26,17 @@ from pyspark.resultiterable import ResultIterable from pyspark.streaming.utils import rddToFileName -======= -from base64 import standard_b64encode as b64enc -import copy -from collections import defaultdict -from collections import namedtuple -from itertools import chain, ifilter, imap -import operator -import os -import sys -import shlex -import traceback -from subprocess import Popen, PIPE -from tempfile import NamedTemporaryFile -from threading import Thread -import warnings -import heapq -from random import Random - -from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long -from pyspark.join import python_join, python_left_outer_join, \ - python_right_outer_join, python_cogroup -from pyspark.statcounter import StatCounter -from pyspark.rddsampler import RDDSampler -from pyspark.storagelevel import StorageLevel -#from pyspark.resultiterable import ResultIterable -from pyspark.rdd import _JavaStackTrace ->>>>>>> initial commit for pySparkStreaming - from py4j.java_collections import ListConverter, MapConverter __all__ = ["DStream"] -<<<<<<< HEAD -======= ->>>>>>> initial commit for pySparkStreaming class DStream(object): def __init__(self, jdstream, ssc, jrdd_deserializer): self._jdstream = jdstream self._ssc = ssc self.ctx = ssc._sc self._jrdd_deserializer = jrdd_deserializer -<<<<<<< HEAD self.is_cached = False self.is_checkpointed = False @@ -106,81 +72,12 @@ def print_(self, label=None): def filter(self, f): """ Return a new DStream containing only the elements that satisfy predicate. -======= - - def generatedRDDs(self): - """ - // RDDs generated, marked as private[streaming] so that testsuites can access it - @transient - """ - pass - - def print_(self): - """ - """ - # print is a resrved name of Python. We cannot give print to function name - getattr(self._jdstream, "print")() - - def pyprint(self): - """ - """ - self._jdstream.pyprint() - - def cache(self): - """ - """ - raise NotImplementedError - - def checkpoint(self): - """ - """ - raise NotImplementedError - - def compute(self, time): - """ - """ - raise NotImplementedError - - def context(self): - """ - """ - raise NotImplementedError - - def count(self): - """ - """ - raise NotImplementedError - - def countByValue(self, numPartitions=None): - """ - """ - raise NotImplementedError - - def countByValueAndWindow(self, duration, slideDuration=None): - """ - """ - raise NotImplementedError - - def countByWindow(self, duration, slideDuration=None): - """ - """ - raise NotImplementedError - - def dstream(self): - """ - """ - raise NotImplementedError - - def filter(self, f): - """ ->>>>>>> initial commit for pySparkStreaming """ def func(iterator): return ifilter(f, iterator) return self.mapPartitions(func) def flatMap(self, f, preservesPartitioning=False): """ -<<<<<<< HEAD Pass each value in the key-value pair DStream through flatMap function without changing the keys: this also retains the original RDD's partition. """ @@ -239,51 +136,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, if numPartitions is None: numPartitions = self._defaultReducePartitions() -======= - """ - def func(s, iterator): return chain.from_iterable(imap(f, iterator)) - return self.mapPartitionsWithIndex(func, preservesPartitioning) - - def foreachRDD(self, f, time): - """ - """ - raise NotImplementedError - - def glom(self): - """ - """ - raise NotImplementedError - - def map(self, f, preservesPartitioning=False): - """ - """ - def func(split, iterator): return imap(f, iterator) - return PipelinedDStream(self, func, preservesPartitioning) - - def mapPartitions(self, f): - """ - """ - def func(s, iterator): return f(iterator) - return self.mapPartitionsWithIndex(func) - - def perist(self, storageLevel): - """ - """ - raise NotImplementedError - - def reduce(self, func, numPartitions=None): - """ - - """ - return self._combineByKey(lambda x:x, func, func, numPartitions) - - def _combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numPartitions = None): - """ - """ - if numPartitions is None: - numPartitions = self.ctx._defaultParallelism() ->>>>>>> initial commit for pySparkStreaming def combineLocally(iterator): combiners = {} for x in iterator: @@ -295,10 +147,7 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) -<<<<<<< HEAD -======= ->>>>>>> initial commit for pySparkStreaming def _mergeCombiners(iterator): combiners = {} for (k, v) in iterator: @@ -307,43 +156,25 @@ def _mergeCombiners(iterator): else: combiners[k] = mergeCombiners(combiners[k], v) return combiners.iteritems() -<<<<<<< HEAD return shuffled.mapPartitions(_mergeCombiners) def partitionBy(self, numPartitions, partitionFunc=None): """ Return a copy of the DStream partitioned using the specified partitioner. -======= - return shuffled.mapPartitions(_mergeCombiners) - - - def partitionBy(self, numPartitions, partitionFunc=None): - """ - Return a copy of the DStream partitioned using the specified partitioner. - ->>>>>>> initial commit for pySparkStreaming """ if numPartitions is None: numPartitions = self.ctx._defaultReducePartitions() if partitionFunc is None: partitionFunc = lambda x: 0 if x is None else hash(x) -<<<<<<< HEAD -======= ->>>>>>> initial commit for pySparkStreaming # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects # to Java. Each object is a (splitNumber, [objects]) pair. outputSerializer = self.ctx._unbatched_serializer -<<<<<<< HEAD def add_shuffle_key(split, iterator): -======= - def add_shuffle_key(split, iterator): - ->>>>>>> initial commit for pySparkStreaming buckets = defaultdict(list) for (k, v) in iterator: @@ -354,26 +185,16 @@ def add_shuffle_key(split, iterator): keyed = PipelinedDStream(self, add_shuffle_key) keyed._bypass_serializer = True with _JavaStackTrace(self.ctx) as st: -<<<<<<< HEAD partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, id(partitionFunc)) jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream() -======= - #JavaDStream - #pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD() - pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream() - partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, - id(partitionFunc)) - jdstream = pairDStream.partitionBy(partitioner).values() ->>>>>>> initial commit for pySparkStreaming dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer)) # This is required so that id(partitionFunc) remains unique, even if # partitionFunc is a lambda: dstream._partitionFunc = partitionFunc return dstream -<<<<<<< HEAD def _defaultReducePartitions(self): """ Returns the default number of partitions to use during reduce tasks (e.g., groupBy). @@ -387,7 +208,6 @@ def _defaultReducePartitions(self): if self.ctx._conf.contains("spark.default.parallelism"): return self.ctx.defaultParallelism else: -<<<<<<< HEAD return 2 def getNumPartitions(self): @@ -398,16 +218,6 @@ def getNumPartitions(self): 2 """ return self._jdstream.partitions().size() -======= - return self.getNumPartitions() - - def getNumPartitions(self): - """ - Return the number of partitions in RDD - """ - # TODO: remove hardcoding. RDD has NumPartitions but DStream does not have. - return 2 ->>>>>>> clean up code def foreachRDD(self, func): """ @@ -610,53 +420,6 @@ def saveAsTextFile(rdd, time): # TODO: implement join # TODO: implement leftOuterJoin # TODO: implemtnt rightOuterJoin -======= - - - def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc): - """ - """ - - raise NotImplementedError - - def repartition(self, numPartitions): - """ - """ - raise NotImplementedError - - def slice(self, fromTime, toTime): - """ - """ - raise NotImplementedError - - def transform(self, transformFunc): - """ - """ - raise NotImplementedError - - def transformWith(self, other, transformFunc): - """ - """ - raise NotImplementedError - - def union(self, that): - """ - """ - raise NotImplementedError - - def window(self, windowDuration, slideDuration=None): - """ - """ - raise NotImplementedError - - def wrapRDD(self, rdd): - """ - """ - raise NotImplementedError - - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): - return PipelinedDStream(self, f, preservesPartitioning) ->>>>>>> initial commit for pySparkStreaming class PipelinedDStream(DStream): diff --git a/python/pyspark/streaming/duration.py b/python/pyspark/streaming/duration.py index 245c137ecfc29..a7f1036e4b856 100644 --- a/python/pyspark/streaming/duration.py +++ b/python/pyspark/streaming/duration.py @@ -1,4 +1,3 @@ -<<<<<<< HEAD # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -19,12 +18,6 @@ from pyspark.streaming import utils -======= -__author__ = 'ktakagiw' - -from pyspark.streaming import utils - ->>>>>>> initial commit for pySparkStreaming class Duration(object): """ Duration for Spark Streaming application. Used to set duration @@ -50,7 +43,6 @@ def __init__(self, millis, _jvm=None): self._jduration = _jvm.Duration(millis) def toString(self): -<<<<<<< HEAD """ Return duration as string @@ -71,19 +63,11 @@ def isZero(self): >>> d_0.isZero() True """ -======= - """ Return duration as string """ - return str(self._millis) + " ms" - - def isZero(self): - """ Check if millis is zero """ ->>>>>>> initial commit for pySparkStreaming return self._millis == 0 def prettyPrint(self): """ Return a human-readable string representing a duration -<<<<<<< HEAD >>> d_10 = Duration(10) >>> d_10.prettyPrint() @@ -97,13 +81,10 @@ def prettyPrint(self): >>> d_1hour = Duration(60 * 60 * 1000) >>> d_1hour.prettyPrint() '1.00 h' -======= ->>>>>>> initial commit for pySparkStreaming """ return utils.msDurationToString(self._millis) def milliseconds(self): -<<<<<<< HEAD """ Return millisecond @@ -136,17 +117,6 @@ def max(self, other): 100 ms """ -======= - """ Return millisecond """ - return self._millis - - def toFormattedString(self): - """ Return millisecond """ - return str(self._millis) - - def max(self, other): - """ Return higher Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) if self > other: return self @@ -154,7 +124,6 @@ def max(self, other): return other def min(self, other): -<<<<<<< HEAD """ Return lower Durattion @@ -165,9 +134,6 @@ def min(self, other): 10 ms """ -======= - """ Return lower Durattion """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) if self < other: return self @@ -175,7 +141,6 @@ def min(self, other): return other def __str__(self): -<<<<<<< HEAD """ >>> d_10 = Duration(10) >>> str(d_10) @@ -194,17 +159,10 @@ def __add__(self, other): >>> print d_110 110 ms """ -======= - return self.toString() - - def __add__(self, other): - """ Add Duration and Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return Duration(self._millis + other._millis) def __sub__(self, other): -<<<<<<< HEAD """ Subtract Duration by Duration @@ -215,14 +173,10 @@ def __sub__(self, other): 90 ms """ -======= - """ Subtract Duration by Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return Duration(self._millis - other._millis) def __mul__(self, other): -<<<<<<< HEAD """ Multiple Duration by Duration @@ -233,9 +187,6 @@ def __mul__(self, other): 1000 ms """ -======= - """ Multiple Duration by Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return Duration(self._millis * other._millis) @@ -243,7 +194,6 @@ def __div__(self, other): """ Divide Duration by Duration for Python 2.X -<<<<<<< HEAD >>> d_10 = Duration(10) >>> d_20 = Duration(20) @@ -251,8 +201,6 @@ def __div__(self, other): >>> print d_2 2 ms -======= ->>>>>>> initial commit for pySparkStreaming """ Duration._is_duration(other) return Duration(self._millis / other._millis) @@ -261,7 +209,6 @@ def __truediv__(self, other): """ Divide Duration by Duration for Python 3.0 -<<<<<<< HEAD >>> d_10 = Duration(10) >>> d_20 = Duration(20) @@ -269,14 +216,11 @@ def __truediv__(self, other): >>> print d_2 2 ms -======= ->>>>>>> initial commit for pySparkStreaming """ Duration._is_duration(other) return Duration(self._millis / other._millis) def __floordiv__(self, other): -<<<<<<< HEAD """ Divide Duration by Duration @@ -302,23 +246,10 @@ def __lt__(self, other): False """ -======= - """ Divide Duration by Duration """ - Duration._is_duration(other) - return Duration(self._millis // other._millis) - - def __len__(self): - """ Length of miilisecond in Duration """ - return len(self._millis) - - def __lt__(self, other): - """ Duration < Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return self._millis < other._millis def __le__(self, other): -<<<<<<< HEAD """ Duration <= Duration @@ -346,19 +277,10 @@ def __eq__(self, other): True """ -======= - """ Duration <= Duration """ - Duration._is_duration(other) - return self.millis <= other._millis - - def __eq__(self, other): - """ Duration == Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return self._millis == other._millis def __ne__(self, other): -<<<<<<< HEAD """ Duration != Duration @@ -371,14 +293,10 @@ def __ne__(self, other): False """ -======= - """ Duration != Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return self._millis != other._millis def __gt__(self, other): -<<<<<<< HEAD """ Duration > Duration @@ -390,14 +308,10 @@ def __gt__(self, other): True """ -======= - """ Duration > Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return self._millis > other._millis def __ge__(self, other): -<<<<<<< HEAD """ Duration >= Duration @@ -410,9 +324,6 @@ def __ge__(self, other): """ -======= - """ Duration >= Duration """ ->>>>>>> initial commit for pySparkStreaming Duration._is_duration(other) return self._millis >= other._millis @@ -426,15 +337,12 @@ def Milliseconds(milliseconds): """ Helper function that creates instance of [[pysparkstreaming.duration]] representing a given number of milliseconds. -<<<<<<< HEAD >>> milliseconds = Milliseconds(1) >>> d_1 = Duration(1) >>> milliseconds == d_1 True -======= ->>>>>>> initial commit for pySparkStreaming """ return Duration(milliseconds) @@ -442,7 +350,6 @@ def Seconds(seconds): """ Helper function that creates instance of [[pysparkstreaming.duration]] representing a given number of seconds. -<<<<<<< HEAD >>> seconds = Seconds(1) >>> d_1sec = Duration(1000) @@ -464,20 +371,3 @@ def Minutes(minutes): """ return Duration(minutes * 60 * 1000) -======= - """ - return Duration(seconds * 1000) - -def Minites(minites): - """ - Helper function that creates instance of [[pysparkstreaming.duration]] representing - a given number of minutes. - """ - return Duration(minutes * 60000) - -if __name__ == "__main__": - d = Duration(1) - print d - print d.milliseconds() - ->>>>>>> initial commit for pySparkStreaming diff --git a/python/pyspark/streaming/jtime.py b/python/pyspark/streaming/jtime.py index 9295c4ee27705..32ef741051283 100644 --- a/python/pyspark/streaming/jtime.py +++ b/python/pyspark/streaming/jtime.py @@ -1,4 +1,3 @@ -<<<<<<< HEAD # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -15,14 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -======= -__author__ = 'ktakagiw' ->>>>>>> initial commit for pySparkStreaming from pyspark.streaming import utils from pyspark.streaming.duration import Duration -<<<<<<< HEAD """ The name of this file, time is not good naming for python because if we do import time when we want to use native python time package, it does @@ -30,8 +25,6 @@ """ -======= ->>>>>>> initial commit for pySparkStreaming class Time(object): """ Time for Spark Streaming application. Used to set Time diff --git a/python/pyspark/streaming/pyprint.py b/python/pyspark/streaming/pyprint.py deleted file mode 100644 index 4beb66950d851..0000000000000 --- a/python/pyspark/streaming/pyprint.py +++ /dev/null @@ -1,72 +0,0 @@ -<<<<<<< HEAD -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -import sys -from itertools import chain - -from pyspark.serializers import PickleSerializer - - -def collect(binary_file_path): - """ - Read pickled file written by SparkStreaming - """ -======= -import sys -from itertools import chain -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer - -def collect(binary_file_path): ->>>>>>> initial commit for pySparkStreaming - dse = PickleSerializer() - with open(binary_file_path, 'rb') as tempFile: - for item in dse.load_stream(tempFile): - yield item -<<<<<<< HEAD - - -======= ->>>>>>> initial commit for pySparkStreaming -def main(): - try: - binary_file_path = sys.argv[1] - except: -<<<<<<< HEAD - print "Missed FilePath in argements" -======= - print "Missed FilePath in argement" ->>>>>>> initial commit for pySparkStreaming - - if not binary_file_path: - return - - counter = 0 - for rdd in chain.from_iterable(collect(binary_file_path)): - print rdd - counter = counter + 1 - if counter >= 10: - print "..." - break - -<<<<<<< HEAD - -======= ->>>>>>> initial commit for pySparkStreaming -if __name__ =="__main__": - exit(main()) diff --git a/python/pyspark/streaming/utils.py b/python/pyspark/streaming/utils.py index 56bb0ca1e9620..ad1dab0696b1a 100644 --- a/python/pyspark/streaming/utils.py +++ b/python/pyspark/streaming/utils.py @@ -37,9 +37,6 @@ def __str__(self): class Java: implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction'] -======= -__author__ = 'ktakagiw' ->>>>>>> initial commit for pySparkStreaming def msDurationToString(ms): """ @@ -53,7 +50,6 @@ def msDurationToString(ms): return "%d ms" % ms elif ms < minute: return "%.1f s" % (float(ms) / second) -<<<<<<< HEAD elif ms < hour: return "%.1f m" % (float(ms) / minute) else: @@ -64,9 +60,3 @@ def rddToFileName(prefix, suffix, time): return prefix + "-" + str(time) + "." + suffix else: return prefix + "-" + str(time) -======= - elif ms < hout: - return "%.1f m" % (float(ms) / minute) - else: - return "%.2f h" % (float(ms) / hour) ->>>>>>> initial commit for pySparkStreaming diff --git a/streaming/pom.xml b/streaming/pom.xml index cb38015c24622..483e200ff9f16 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,11 +21,7 @@ org.apache.spark spark-parent -<<<<<<< HEAD 1.2.0-SNAPSHOT -======= - 1.0.0 ->>>>>>> initial commit for pySparkStreaming ../pom.xml diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 998fa24eba91b..cb9014cda6311 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.api.python -<<<<<<< HEAD import java.io._ import java.io.{ObjectInputStream, IOException} import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} @@ -37,7 +36,6 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ - class PythonDStream[T: ClassTag]( parent: DStream[T], command: Array[Byte], @@ -48,46 +46,6 @@ class PythonDStream[T: ClassTag]( broadcastVars: JList[Broadcast[Array[Byte]]], accumulator: Accumulator[JList[Array[Byte]]]) extends DStream[Array[Byte]](parent.ssc) { -======= -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections} - -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} -import org.apache.spark.broadcast.Broadcast -import org.apache.spark._ -import org.apache.spark.util.Utils -import java.io._ -import scala.Some -import org.apache.spark.streaming.Duration -import scala.util.control.Breaks._ -import org.apache.spark.broadcast.Broadcast -import scala.Some -import org.apache.spark.streaming.Duration -import org.apache.spark.rdd.RDD -import org.apache.spark.api.python.PythonRDD - - -import org.apache.spark.streaming.{Duration, Time} -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.api.java._ -import org.apache.spark.rdd.RDD -import org.apache.spark.api.python._ -import org.apache.spark.api.python.PairwiseRDD - - -import scala.reflect.ClassTag - - -class PythonDStream[T: ClassTag]( - parent: DStream[T], - command: Array[Byte], - envVars: JMap[String, String], - pythonIncludes: JList[String], - preservePartitoning: Boolean, - pythonExec: String, - broadcastVars: JList[Broadcast[Array[Byte]]], - accumulator: Accumulator[JList[Array[Byte]]] - ) extends DStream[Array[Byte]](parent.ssc) { ->>>>>>> initial commit for pySparkStreaming override def dependencies = List(parent) @@ -102,14 +60,11 @@ class PythonDStream[T: ClassTag]( case None => None } } -<<<<<<< HEAD def foreachRDD(foreachFunc: PythonRDDFunction) { new PythonForeachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } -======= ->>>>>>> initial commit for pySparkStreaming val asJavaDStream = JavaDStream.fromDStream(this) /** @@ -132,11 +87,6 @@ class PythonDStream[T: ClassTag]( tempFileStream.close() // This value has to be passed from python -<<<<<<< HEAD - //val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") -======= - val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON") ->>>>>>> initial commit for pySparkStreaming val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") //val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile??? //absolute path to the python script is needed to change because we do not use pysparkstreaming @@ -174,26 +124,6 @@ class PythonDStream[T: ClassTag]( } } -/* -private class PairwiseDStream(prev:DStream[Array[Byte]]) extends -DStream[(Long, Array[Byte])](prev.ssc){ - override def dependencies = List(prev) - - override def slideDuration: Duration = prev.slideDuration - - override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={ - prev.getOrCompute(validTime) match{ - case Some(rdd)=>Some(rdd) - val pairwiseRDD = new PairwiseRDD(rdd) - Some(pairwiseRDD.asJavaPairRDD.rdd) - case None => None - } - } -<<<<<<< HEAD - val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) -} - - private class PythonPairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends DStream[Array[Byte]](prev.ssc){ override def dependencies = List(prev) @@ -274,12 +204,3 @@ class PythonTransformedDStream( //val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this) } */ -======= - val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this) -} -*/ - - - - ->>>>>>> initial commit for pySparkStreaming