From 74aa0df8f7f132b62754e5159262e4a5b9b641ab Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Apr 2017 16:10:40 -0700 Subject: [PATCH] [SPARK-20377][SS] Fix JavaStructuredSessionization example ## What changes were proposed in this pull request? Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts. ## How was this patch tested? manually ran the example Author: Tathagata Das Closes #17676 from tdas/SPARK-20377. --- .../sql/streaming/JavaStructuredSessionization.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index da3a5dfe8628b..d3c8516882fa6 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -76,8 +76,6 @@ public Iterator call(LineWithTimestamp lineWithTimestamp) throws Exceptio for (String word : lineWithTimestamp.getLine().split(" ")) { eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); } - System.out.println( - "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size()); return eventList.iterator(); } }; @@ -100,7 +98,7 @@ public Iterator call(LineWithTimestamp lineWithTimestamp) throws Exceptio // If timed out, then remove session and send final update if (state.hasTimedOut()) { SessionUpdate finalUpdate = new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true); state.remove(); return finalUpdate; @@ -133,7 +131,7 @@ public Iterator call(LineWithTimestamp lineWithTimestamp) throws Exceptio // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds"); return new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false); } } }; @@ -215,7 +213,8 @@ public void setStartTimestampMs(long startTimestampMs) { public long getEndTimestampMs() { return endTimestampMs; } public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; } - public long getDurationMs() { return endTimestampMs - startTimestampMs; } + public long calculateDuration() { return endTimestampMs - startTimestampMs; } + @Override public String toString() { return "SessionInfo(numEvents = " + numEvents + ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";