Skip to content

Commit

Permalink
[SPARK-20377][SS] Fix JavaStructuredSessionization example
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts.

## How was this patch tested?
manually ran the example

Author: Tathagata Das <[email protected]>

Closes #17676 from tdas/SPARK-20377.
  • Loading branch information
tdas committed Apr 18, 2017
1 parent f654b39 commit 74aa0df
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exceptio
for (String word : lineWithTimestamp.getLine().split(" ")) {
eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
}
System.out.println(
"Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size());
return eventList.iterator();
}
};
Expand All @@ -100,7 +98,7 @@ public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exceptio
// If timed out, then remove session and send final update
if (state.hasTimedOut()) {
SessionUpdate finalUpdate = new SessionUpdate(
sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true);
sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true);
state.remove();
return finalUpdate;

Expand Down Expand Up @@ -133,7 +131,7 @@ public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exceptio
// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds");
return new SessionUpdate(
sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false);
sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false);
}
}
};
Expand Down Expand Up @@ -215,7 +213,8 @@ public void setStartTimestampMs(long startTimestampMs) {
public long getEndTimestampMs() { return endTimestampMs; }
public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }

public long getDurationMs() { return endTimestampMs - startTimestampMs; }
public long calculateDuration() { return endTimestampMs - startTimestampMs; }

@Override public String toString() {
return "SessionInfo(numEvents = " + numEvents +
", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";
Expand Down

0 comments on commit 74aa0df

Please sign in to comment.