Skip to content

Commit

Permalink
[SPARK-45736][EXAMPLE] Use \s+ as seperator when testing kafka source…
Browse files Browse the repository at this point in the history
… and network source
  • Loading branch information
dengziming committed Oct 31, 2023
1 parent 2cc032d commit eddb4b2
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 7 deletions.
3 changes: 2 additions & 1 deletion examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
import re
import sys

from pyspark import SparkContext
Expand All @@ -38,7 +39,7 @@
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split("\\s+"))\
counts = lines.flatMap(lambda line: re.split(r'\s+', line))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)
counts.pprint()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
localhost 9999`
"""

import re
import sys
from typing import Tuple

Expand All @@ -55,7 +56,7 @@ def print_happiest_words(rdd: RDD[Tuple[float, str]]) -> None:

def line_to_tuple(line: str) -> Tuple[str, str]:
try:
k, v = line.split("\\s+")
k, v = re.split(r'\s+', line)
return k, v
except ValueError:
return "", ""
Expand All @@ -67,7 +68,7 @@ def line_to_tuple(line: str) -> Tuple[str, str]:

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

word_counts = lines.flatMap(lambda line: line.split("\\s+")) \
word_counts = lines.flatMap(lambda line: re.split(r'\s+', line)) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""
import datetime
import os
import re
import sys
from typing import List, Tuple

Expand Down Expand Up @@ -73,7 +74,7 @@ def createContext(host: str, port: int, outputPath: str) -> StreamingContext:
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split("\\s+"))
words = lines.flatMap(lambda line: re.split(r'\s+', line))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time: datetime.datetime, rdd: RDD[Tuple[str, int]]) -> None:
Expand Down
3 changes: 2 additions & 1 deletion examples/src/main/python/streaming/sql_network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
"""
import re
import sys
import datetime

Expand Down Expand Up @@ -56,7 +57,7 @@ def getSparkSessionInstance(sparkConf: SparkConf) -> SparkSession:
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, int(port))
words = lines.flatMap(lambda line: line.split("\\s+"))
words = lines.flatMap(lambda line: re.split(r'\s+', line))

# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time: datetime.datetime, rdd: RDD[str]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
`$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
localhost 9999`
"""
import re
import sys
from typing import Iterable, Optional

Expand All @@ -50,7 +51,7 @@ def updateFunc(new_values: Iterable[int], last_sum: Optional[int]) -> int:
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split("\\s+"))\
running_counts = lines.flatMap(lambda line: re.split(r'\s+', line))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ port <- as.integer(args[[2]])
lines <- read.stream("socket", host = hostname, port = port)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
words <- selectExpr(lines, "explode(strsplit(value, '\\s+')) as word")

# Generate running word count
wordCounts <- count(groupBy(words, "word"))
Expand Down

0 comments on commit eddb4b2

Please sign in to comment.