From eddb4b2981b00be533f77cd757bcc2762ca2991f Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 31 Oct 2023 15:05:25 +0800 Subject: [PATCH] [SPARK-45736][EXAMPLE] Use \s+ as seperator when testing kafka source and network source --- examples/src/main/python/streaming/network_wordcount.py | 3 ++- .../src/main/python/streaming/network_wordjoinsentiments.py | 5 +++-- .../main/python/streaming/recoverable_network_wordcount.py | 3 ++- examples/src/main/python/streaming/sql_network_wordcount.py | 3 ++- .../src/main/python/streaming/stateful_network_wordcount.py | 3 ++- examples/src/main/r/streaming/structured_network_wordcount.R | 2 +- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 5d8f53ede5b7a..ebe01d162a92e 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -25,6 +25,7 @@ and then run the example `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999` """ +import re import sys from pyspark import SparkContext @@ -38,7 +39,7 @@ ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - counts = lines.flatMap(lambda line: line.split("\\s+"))\ + counts = lines.flatMap(lambda line: re.split(r'\s+', line))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a + b) counts.pprint() diff --git a/examples/src/main/python/streaming/network_wordjoinsentiments.py b/examples/src/main/python/streaming/network_wordjoinsentiments.py index 9874d30ff540d..88a00474ef0b8 100644 --- a/examples/src/main/python/streaming/network_wordjoinsentiments.py +++ b/examples/src/main/python/streaming/network_wordjoinsentiments.py @@ -30,6 +30,7 @@ localhost 9999` """ +import re import sys from typing import Tuple @@ -55,7 +56,7 @@ def print_happiest_words(rdd: RDD[Tuple[float, str]]) -> None: def line_to_tuple(line: str) -> Tuple[str, str]: try: - k, v = line.split("\\s+") + k, v = re.split(r'\s+', line) return k, v except ValueError: return "", "" @@ -67,7 +68,7 @@ def line_to_tuple(line: str) -> Tuple[str, str]: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - word_counts = lines.flatMap(lambda line: line.split("\\s+")) \ + word_counts = lines.flatMap(lambda line: re.split(r'\s+', line)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py index 2a52945d375a1..21f8523495231 100644 --- a/examples/src/main/python/streaming/recoverable_network_wordcount.py +++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py @@ -37,6 +37,7 @@ """ import datetime import os +import re import sys from typing import List, Tuple @@ -73,7 +74,7 @@ def createContext(host: str, port: int, outputPath: str) -> StreamingContext: # Create a socket stream on target ip:port and count the # words in input stream of \n delimited text (e.g. generated by 'nc') lines = ssc.socketTextStream(host, port) - words = lines.flatMap(lambda line: line.split("\\s+")) + words = lines.flatMap(lambda line: re.split(r'\s+', line)) wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) def echo(time: datetime.datetime, rdd: RDD[Tuple[str, int]]) -> None: diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py index eaad73f526161..118f298a188b3 100644 --- a/examples/src/main/python/streaming/sql_network_wordcount.py +++ b/examples/src/main/python/streaming/sql_network_wordcount.py @@ -27,6 +27,7 @@ and then run the example `$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999` """ +import re import sys import datetime @@ -56,7 +57,7 @@ def getSparkSessionInstance(sparkConf: SparkConf) -> SparkSession: # Create a socket stream on target ip:port and count the # words in input stream of \n delimited text (e.g. generated by 'nc') lines = ssc.socketTextStream(host, int(port)) - words = lines.flatMap(lambda line: line.split("\\s+")) + words = lines.flatMap(lambda line: re.split(r'\s+', line)) # Convert RDDs of the words DStream to DataFrame and run SQL query def process(time: datetime.datetime, rdd: RDD[str]) -> None: diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py index 32dbc780531bf..4cbeef64b6fc3 100644 --- a/examples/src/main/python/streaming/stateful_network_wordcount.py +++ b/examples/src/main/python/streaming/stateful_network_wordcount.py @@ -29,6 +29,7 @@ `$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \ localhost 9999` """ +import re import sys from typing import Iterable, Optional @@ -50,7 +51,7 @@ def updateFunc(new_values: Iterable[int], last_sum: Optional[int]) -> int: return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - running_counts = lines.flatMap(lambda line: line.split("\\s+"))\ + running_counts = lines.flatMap(lambda line: re.split(r'\s+', line))\ .map(lambda word: (word, 1))\ .updateStateByKey(updateFunc, initialRDD=initialStateRDD) diff --git a/examples/src/main/r/streaming/structured_network_wordcount.R b/examples/src/main/r/streaming/structured_network_wordcount.R index cda18ebc072ee..fe128c1c2a58a 100644 --- a/examples/src/main/r/streaming/structured_network_wordcount.R +++ b/examples/src/main/r/streaming/structured_network_wordcount.R @@ -44,7 +44,7 @@ port <- as.integer(args[[2]]) lines <- read.stream("socket", host = hostname, port = port) # Split the lines into words -words <- selectExpr(lines, "explode(split(value, ' ')) as word") +words <- selectExpr(lines, "explode(strsplit(value, '\\s+')) as word") # Generate running word count wordCounts <- count(groupBy(words, "word"))