Skip to content

Commit

Permalink
[SPARK-45736][EXAMPLE] Use as seperator when testing kafka source and…
Browse files Browse the repository at this point in the history
… network source
  • Loading branch information
dengziming committed Oct 31, 2023
1 parent 2fd1f10 commit 2cc032d
Show file tree
Hide file tree
Showing 19 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {

// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+")).iterator(),
Encoders.STRING()).groupBy("value").count();

// Start running the query that prints the running counts to the console
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static void main(String[] args) throws Exception {

// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+")).iterator(),
Encoders.STRING()).groupBy("value").count();

// Start running the query that prints the running counts to the console
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void main(String[] args) throws Exception {

// Split the lines into words
Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+")).iterator(),
Encoders.STRING());

// Generate running word count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void main(String[] args) throws Exception {
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> {
List<Tuple2<String, Timestamp>> result = new ArrayList<>();
for (String word : t._1.split(" ")) {
for (String word : t._1.split("\\s+")) {
result.add(new Tuple2<>(word, t._2));
}
return result.iterator();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
counts = lines.flatMap(lambda line: line.split("\\s+"))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)
counts.pprint()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def print_happiest_words(rdd: RDD[Tuple[float, str]]) -> None:

def line_to_tuple(line: str) -> Tuple[str, str]:
try:
k, v = line.split(" ")
k, v = line.split("\\s+")
return k, v
except ValueError:
return "", ""
Expand All @@ -67,7 +67,7 @@ def line_to_tuple(line: str) -> Tuple[str, str]:

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

word_counts = lines.flatMap(lambda line: line.split(" ")) \
word_counts = lines.flatMap(lambda line: line.split("\\s+")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def createContext(host: str, port: int, outputPath: str) -> StreamingContext:
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
words = lines.flatMap(lambda line: line.split("\\s+"))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

def echo(time: datetime.datetime, rdd: RDD[Tuple[str, int]]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def getSparkSessionInstance(sparkConf: SparkConf) -> SparkSession:
# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, int(port))
words = lines.flatMap(lambda line: line.split(" "))
words = lines.flatMap(lambda line: line.split("\\s+"))

# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time: datetime.datetime, rdd: RDD[str]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def updateFunc(new_values: Iterable[int], last_sum: Optional[int]) -> int:
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
running_counts = lines.flatMap(lambda line: line.split("\\s+"))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object StructuredKafkaWordCount {
.as[String]

// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val wordCounts = lines.flatMap(_.split("\\s+")).groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object StructuredKerberizedKafkaWordCount {
.as[String]

// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val wordCounts = lines.flatMap(_.split("\\s+")).groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object StructuredNetworkWordCount {
.load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
val words = lines.as[String].flatMap(_.split("\\s+"))

// Generate running word count
val wordCounts = words.groupBy("value").count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object StructuredNetworkWordCountWindowed {

// Split the lines into words, retaining timestamps
val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
line._1.split("\\s+").map(word => (word, line._2))
).toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object DirectKafkaWordCount {

// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object DirectKerberizedKafkaWordCount {

// Get the lines, split them into words, count the words and print
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object NetworkWordCount {
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object RecoverableNetworkWordCount {
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (e.g. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the excludeList Broadcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object SqlNetworkWordCount {
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object StatefulNetworkWordCount {
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (e.g. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split("\\s+"))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
Expand Down

0 comments on commit 2cc032d

Please sign in to comment.