-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18834][SS] Expose event time stats through StreamingQueryProgress #16258
Conversation
| "numInputRows" : 678, | ||
| "inputRowsPerSecond" : 10.0, | ||
| "durationMs" : { | ||
| "total" : 0 | ||
| }, | ||
| "currentWatermark" : 3, | ||
| "queryTimestamps" : { | ||
| "eventTime.avg" : "2016-12-05T20:54:20.827Z", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus the dot is in the key. Does this mean that when parsing using our json DF, we may have to refer to the column as
progress.queryTimestamps.`eventTime.max`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option is we could use dashes instead, but honestly I'm getting a little confused by all these timestamps. In particular, I'm not sure what is the difference between triggerTimestamp
and processingTime
(and, I think that having processingTime
mean anything different than System.currentTimeMillis
will be confusing to users coming from other systems).
The two things that I think you really want to be able to track from the metrics feed are:
- the actual timestamp when this progress update was produced. I think this should remain top level and be called
timestamp
. - Stats about the event time, so that I can know roughly what data is present in this batch. This can be useful for several reasons, including finding other problems upstream. The more I think about this, I think you just want to see this as
"eventTime": { "min": ..., "max": ..., "watermark": ... }
Its actually not clear to me why you need to know the original batch start time. In 99% of cases this is the same as the timestamp
. If you are executing, a batch due to a failure, it will be different. But, why does an outside monitoring job care? I can't come up with any interesting graphs that you would make with this extra field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My view is that the StreamingQueryProgress
class is not just for monitoring but for debugging as well. The batchProcessingTime may be important for debugging why a batch generate some results in that 1% of the case where trigger time is different from the processing time. And in those cases, there is no other way to expose what the batchProcessingTime was that batch was if not exposed through the Progress API.
That said, we could not expose batchProcessingTime now and expose only eventTime. But it may be more complex to add another new field in Progress to expose the processing time (as it cannot be added to the map once we name it eventTime
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is available, it is stored in a human readable format in the offset log (BTW, is that a long or a timestamp?). I think in the log run we'll want to open up another API that gives you access to this log, but I think that can come later. For now, its still pretty easy to find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the log gets cleaned up continuously, so will not be available if you are trying to debug it a day or an hour later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not any more... we keep 1000 now, right? This just really feels like we are reaching for a use case. We can always add it, but I think the way it is done currently in this PR is very confusing and having timestamp
and eventTime
are very clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1000 * 5 seconds batch is < 2 hours. Anyways, I think it will be equally or more confusing adding it later as a top-level field in Progress (e.g. timestamp
, eventTime
, processingTime
?). It may be better to have the top-level timestamp
, and all other execution level detailed timestamps inside a single map.
Anyways, I am updating the PR, but I think this is a little short-sighted.
@@ -38,13 +38,18 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | |||
| "id" : "${testProgress1.id.toString}", | |||
| "runId" : "${testProgress1.runId.toString}", | |||
| "name" : "myName", | |||
| "timestamp" : "2016-12-05T20:54:20.827Z", | |||
| "triggerTimestamp" : "2016-12-05T20:54:20.827Z", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marmbrus One idea is to not have a top level timestamp, and merge this with the queryTimestamps
. Then this would be a key triggerStartTime
in the queryTimestamps
map. In fact then we can rename queryTimestamps
to timestamps
.
@@ -360,6 +360,24 @@ class StreamExecution( | |||
if (hasNewData) { | |||
// Current batch timestamp in milliseconds | |||
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() | |||
// Update the eventTime watermark if we find one in the plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this such that the watermark is updated before starting a batch, rather than after finishing a batch. This keeps batchWatermarkMs consistent with batchTimestampsMs, both are set before starting a batch, and reduces complexities in the ProgressReporter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this makes sense.
@@ -33,27 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils | |||
|
|||
/** | |||
* :: Experimental :: | |||
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this below to keep the StreamingQueryProgress class first in the file. More important code first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats actually the opposite of how most code in SQL is laid out, so I think it would be better to avoid this change. The logic here is declarations that are use later should come first (references before declaration make it harder to read), and stuff at the end of the file is kind of hidden.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah, then for consistency, SourceProgress and SinkProgress should also be before StreamingQueryProgress. But thats a bigger change should be done in a different PR.
@@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | |||
| "durationMs" : { | |||
| "total" : 0 | |||
| }, | |||
| "currentWatermark" : 3, | |||
| "eventTime" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome!
Test build #70042 has finished for PR 16258 at commit
|
@@ -67,7 +67,7 @@ class StateOperatorProgress private[sql]( | |||
* Similarly, when there is no data to be processed, the batchId will not be | |||
* incremented. | |||
* @param durationMs The amount of time taken to perform various operations in milliseconds. | |||
* @param currentWatermark The current event time watermark in milliseconds | |||
* @param eventTime Statistics of event time seen in this batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since you are touching this file, could you also fix the comment of @param timestamp
? It's better to document the format as well, such as, The beginning time of the trigger in ISO8601 format. (e.g., 2016-12-05T20:54:20.827Z)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, could you also add an example in the comment of eventTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Just some nits.
this.count += that.count | ||
} | ||
|
||
def avg: Long = (sum.toDouble / count).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not use sum / count
?
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
child.execute().mapPartitions { iter => | ||
val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) | ||
iter.map { row => | ||
maxEventTime.add(getEventTime(row).getLong(0)) | ||
eventTimeStats.add((getEventTime(row).getLong(0).toDouble / 1000).toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be getEventTime(row).getLong(0) / 1000
/** Tracks the maximum positive long seen. */ | ||
class MaxLong(protected var currentValue: Long = 0) | ||
extends AccumulatorV2[Long, Long] { | ||
/** Class for collecting event time stats with an accumulator */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please document the time unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documented it in EventTimeWatermarkExec
Test build #70051 has finished for PR 16258 at commit
|
Test build #70053 has finished for PR 16258 at commit
|
Test build #70052 has finished for PR 16258 at commit
|
Test build #70058 has finished for PR 16258 at commit
|
LGTM |
1 similar comment
LGTM |
Merging to master and 2.1 |
## What changes were proposed in this pull request? - Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings. - Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started. ## How was this patch tested? Updated tests Author: Tathagata Das <[email protected]> Closes #16258 from tdas/SPARK-18834. (cherry picked from commit c68fb42) Signed-off-by: Tathagata Das <[email protected]>
* @param currentWatermark The current event time watermark in milliseconds | ||
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys: | ||
* { | ||
* "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi all, I am just leaving a comment as a gentle reminder to note that we probably should replace <
or >
to other ones such as {@literal <}
or {@literal >}
in the future. Please refer #16013 (comment). This causes javadoc8 break.
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:19: error: bad use of '>'
[error] * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger
[error] ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:20: error: bad use of '>'
[error] * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger
[error] ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:21: error: bad use of '>'
[error] * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger
[error] ^
[error] .../java/org/apache/spark/sql/streaming/StreamingQueryProgress.java:22: error: bad use of '>'
[error] * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger
[error] ^
## What changes were proposed in this pull request? - Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings. - Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started. ## How was this patch tested? Updated tests Author: Tathagata Das <[email protected]> Closes apache#16258 from tdas/SPARK-18834.
…in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on apache#16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <[email protected]> Closes apache#16304 from tdas/SPARK-18834-1.
…in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on #16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <[email protected]> Closes #16304 from tdas/SPARK-18834-1. (cherry picked from commit 607a1e6) Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? - Changed `StreamingQueryProgress.watermark` to `StreamingQueryProgress.queryTimestamps` which is a `Map[String, String]` containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings. - Renamed `StreamingQuery.timestamp` to `StreamingQueryProgress.triggerTimestamp` to differentiate from `queryTimestamps`. It has the timestamp of when the trigger was started. ## How was this patch tested? Updated tests Author: Tathagata Das <[email protected]> Closes apache#16258 from tdas/SPARK-18834.
…in months or years ## What changes were proposed in this pull request? Two changes - Fix how delays specified in months and years are translated to milliseconds - Following up on apache#16258, not show watermark when there is no watermarking in the query ## How was this patch tested? Updated and new unit tests Author: Tathagata Das <[email protected]> Closes apache#16304 from tdas/SPARK-18834-1.
What changes were proposed in this pull request?
Changed
StreamingQueryProgress.watermark
toStreamingQueryProgress.queryTimestamps
which is aMap[String, String]
containing the following keys: "eventTime.max", "eventTime.min", "eventTime.avg", "processingTime", "watermark". All of them UTC formatted strings.Renamed
StreamingQuery.timestamp
toStreamingQueryProgress.triggerTimestamp
to differentiate fromqueryTimestamps
. It has the timestamp of when the trigger was started.How was this patch tested?
Updated tests