-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress #21721
Conversation
Test build #92659 has finished for PR 21721 at commit
|
I think that SS'metrics should be report to Spark UI. Then user can lean application operations! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of custom metrics, but I worry the specific one being proposed in this PR might not be a good idea, and that we might end up in the wrong place if we generalize from it.
Maybe a better proof of concept would be something like the total size of a MemorySink? (That would generalize pretty directly to other cases I expect would benefit from custom metrics, such as a caching StreamWriter reporting its cache size.)
@@ -0,0 +1,30 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be in v2/reader/streaming.
("startOffset" -> tryParse(startOffset)) ~ | ||
("endOffset" -> tryParse(endOffset)) ~ | ||
("numInputRows" -> JInt(numInputRows)) ~ | ||
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ | ||
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) | ||
|
||
if (customMetrics != null) { | ||
jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to get an error to the user if their custom metrics fail to parse? I'm not entirely sure that's the right thing to do, but I worry that it'll be hard to develop against this API if we just silently drop malformed metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently in case of error, it just reports the JSON string as is (similar to start/end offsets). However I agree we can add error reporting to this API. Will address.
@@ -379,3 +384,16 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( | |||
} | |||
} | |||
} | |||
|
|||
// Currently reports per topic-partition lag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadoc style for top level comments
@@ -95,4 +95,25 @@ private object JsonUtils { | |||
} | |||
Serialization.write(result) | |||
} | |||
|
|||
/** | |||
* Write per-topic partition lag as json string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "lag" here just the difference (at the time a batch ends) between the last offset Spark knows about and the last offset Spark has processed? I'm not sure this is super useful to know. If maxOffsets isn't set it's always going to be 0, no matter how far Spark gets behind the Kafka cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 if spark keeps up with the rate at which messages are ingested into Kafka topics (steady state).
I would assume we would always want to set some reasonable micro batch sizes by setting maxOffsetsPerTrigger
. Otherwise spark can end up processing entire data in the topics in one micro batch (e.g. if the starting offset is set to earliest or the streaming job is stopped for sometime and restarted). IMO, we should address this by setting some sane defaults which is currently missing.
If we want to handle the custom metrics for Kafka outside the scope of this PR I will raise a separate one for this, but this can be really useful to identify issues like data skews in some partitions or some other issues causing spark to not keep up with the ingestion rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest handling the custom metrics for Kafka outside the scope of this PR. Maybe we should have a default maxOffsets, but given that we don't I'm worried about adding a metric that's misleading in the default case.
Though I haven't take a look yet, I would like to see this feature (mentioned from #21622 (comment)) and happy to see this being implemented! While I love the feature, I agree with @jose-torres that it is going to be a new public API (part of Datasource V2) so worth to discuss regarding the API itself before having specific implementation. (Maybe a kind of POC is OK for @jose-torres ?) |
@jose-torres @HeartSaVioR , Addressed the initial comments. Will add Writer support for custom metrics and add MemorySink as an example. I am ok to move out Kafka custom metrics into a separate PR but the lag metrics is valuable IMO. |
Test build #92694 has finished for PR 21721 at commit
|
Looks fine to me with a MemorySink example. I don't think a formal discussion is super necessary - the major advantage of the mixin model is to let us add things like this without impacting the broader API. |
Test build #92870 has finished for PR 21721 at commit
|
47d802b
to
bca054f
Compare
@jose-torres I have removed the Kafka lag metrics out of this PR and added writer metrics and the number of rows in the memory sink as an example. |
Test build #92874 has finished for PR 21721 at commit
|
Test build #92876 has finished for PR 21721 at commit
|
@jose-torres, addressed initial comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, left couple of comments.
try { | ||
Some(parse(metrics.json())) | ||
} catch { | ||
case ex: Exception => onInvalidMetrics(ex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try
According to the guide, this line needs to be replaced with case NonFatal(e) =>
, and I'd place onInvalidMetrics
and None
to same indentation.
def extractMetrics(getMetrics: () => CustomMetrics, | ||
onInvalidMetrics: (Exception) => Unit): Option[JValue] = { | ||
val metrics = getMetrics() | ||
if (metrics != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could de-indent via return early
: it would be simpler cause there's nothing but returning None
if metrics is null, and style guide has such case as one of exceptional case where return statement is preferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced it with Option
and map
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; | ||
|
||
/** | ||
* A mix in interface for {@link DataSourceWriter}. Data source writers can implement this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we intend creating a new interface as mix-in, we may not need to create individual interfaces for each DataSourceReader and DataSourceWriter. We could have only one interface and let DataSourceReader and DataSourceWriter add such mix-in interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention was to restrict the mixin so that it can be applied only to DataSourceReader
and DataSourceWriter
(similar pattern followed in other mixins) by inheriting the appropriate types. Unfortunately theres no common ancestor for the mixin to inherit from so I had to duplicate the interface. Agree that its not ideal.
A few options:
- Have a common ancestor marker interface (say
DataSourceComponent
) which is the super type ofDataSourceReader
andDataSourceWriter
. Then we can have a single mixin that is a subtype of that interface. We may encounter similar usages for other mixins in future. - The mixin does not inherit anything (neither DataSourceReader nor DataSourceWriter). Here we cannot impose a restriction on the type of classes the mixin can be applied to.
- Duplicate interfaces (the proposed option in the patch).
I prefer option 1, but would like to proceed based on the feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK got your intention. I think it makes sense. I'm OK with all three options and personally prefer 1 or 2 if the intention is to mix-in, but let's see committers' feedback on this.
Test build #93793 has finished for PR 21721 at commit
|
@HeartSaVioR thanks for taking time to review. Addressed the comments, can you take a look again? Regarding the mixin interface, would like to take feedback from others. @jose-torres @tdas @zsxwing could you take a look at the patch and also comment on #21721 (comment) ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
CustomMetrics getCustomMetrics(); | ||
|
||
/** | ||
* Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this was a bit convoluted at first, but on reflection I can understand why this additional flexibility is valuable. I think it'd be worth writing an example here of what a source might want to do other than ignore the invalid metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, this is the same thing we talked about in the initial round of review. I think "throw an error when developing the connector so you can make sure your metrics work right" would still be a good alternative to mention in the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated javadoc to explain the same.
Test build #93813 has finished for PR 21721 at commit
|
Looks like we would also need to add SourceProgress and SinkProgress into mima exclude list. |
@@ -163,7 +163,8 @@ class SourceProgress protected[sql]( | |||
val endOffset: String, | |||
val numInputRows: Long, | |||
val inputRowsPerSecond: Double, | |||
val processedRowsPerSecond: Double) extends Serializable { | |||
val processedRowsPerSecond: Double, | |||
val customMetrics: Option[JValue] = None) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default value does not work in Java API and probably MiMa complains about this. I think another constructor should better be made instead of default value to work around this.
@@ -143,18 +151,46 @@ trait ProgressReporter extends Logging { | |||
} | |||
logDebug(s"Execution stats: $executionStats") | |||
|
|||
// extracts custom metrics from readers and writers | |||
def extractMetrics(getMetrics: () => Option[CustomMetrics], | |||
onInvalidMetrics: (Exception) => Unit): Option[JValue] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[JValue] = {
per https://github.com/databricks/scala-style-guide#spacing-and-indentation
* An interface for reporting custom metrics from streaming sources and sinks | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface CustomMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java side should also be 2 spaced indented (see "Code Style Guide" in https://spark.apache.org/contributing.html)
@@ -163,7 +163,8 @@ class SourceProgress protected[sql]( | |||
val endOffset: String, | |||
val numInputRows: Long, | |||
val inputRowsPerSecond: Double, | |||
val processedRowsPerSecond: Double) extends Serializable { | |||
val processedRowsPerSecond: Double, | |||
val customMetrics: Option[JValue] = None) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait .. this is an exposed API, right? I guess this is exposed to Java API too (for instance query.lastProgress().sources()
)? In that case, we should avoid Scala's Option and org.json4s.*
. If this is supposed to be hidden here, I think we should better find a way to hide this with package level access modifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon
Nice finding. I missed it while reviewing.
Btw, FYI, in #21469 I'm adding new field with default value in StateOperatorProgress, like val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
and MiMa doesn't complain.
https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52
@arunmahadevan
Maybe ju.Map[String, JLong]
will also work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to Json String instead of JValue.
@HyukjinKwon , have addressed the comments and modified SourceProgress and SinkProgress to take String instead of JValue so that this can be easily used from Java. Regarding the default value in the ctor, I am not sure if its an issue because the object is mostly read only from user perspective and would be an issue only if the user tries to construct it from Java. I have added overloaded ctors anyways. Please take a look. |
Test build #93907 has finished for PR 21721 at commit
|
val customMetrics: String) extends Serializable { | ||
|
||
/** SourceProgress without custom metrics. */ | ||
def this( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make it protected[sql] def this(
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
Looks fine otherwise to me too if the test passes |
Test build #94008 has finished for PR 21721 at commit
|
retest this please |
@arunmahadevan yeah, it's better to figure out the solution for continuous mode as well. As you mentioned, the current SQL metrics are not updated unless the task completes, so we may need to add new APIs to support reporting metrics for continuous mode. It would be great that there will be a consistent API for all modes. Let's step back and think about it. |
@zsxwing @gatorsmile @cloud-fan |
Unfortunate thing is that continuous mode allows different epochs between partitions, hence query progress just doesn't fit so hard to address SPARK-23887 for now. My 2 cents is continuous mode also should sync epoch in query so all partitions deal with same epoch. When we want to deal with aggregation it should be essential. (Think about state in stateful operator if rows from multiple epochs are being provided as input. What's the epoch of the state?) |
There are many unknowns to be figured out for continuous mode. Though the way to capture the metrics would be different for continuous execution, the interface of whats reported is not expected to change. Given that we already report progress for micro-batch and as a user of spark the changes in the patch are quite useful to report custom metrics for what works right now and since it does not impact other parts of DataSourceV2 apis (only the sources that wants to report custom metrics would add the traits) IMO, we can keep this and continue to investigate in a time-bound manner of how to capture metrics for continuous mode. |
/** | ||
* An interface for reporting custom metrics from streaming sources and sinks | ||
*/ | ||
@InterfaceStability.Evolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we switch this to Unstable
instead for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #22296 in case we want.
Can we remove and replace this to another one when we are clear on how to deal with continuous mode? I merged this after having sufficient talks with @HeartSaVioR. The JIRA (https://issues.apache.org/jira/browse/SPARK-23887) was by @jose-torres who left a sign-off here. BTW, I hope those discussions should better be made specifically in those PRs or JIRAs. Is this just because some concerns were found later? it looked just good and many positive feedbacks but just looks held off for no reasons from my point of view. |
@HyukjinKwon yes we can mark it unstable. Like I mentioned multiple times in previous comments the traits added here like CustomMetrics, SupportsCustomReaderMetrics etc have nothing specific to micro batch or continuous mode and un-affected when we finally start reporting progress for continuous mode. The way to collect and report metrics in continuous mode needs to be figured out and I think should be discussed in respective JIRAs. |
Note that, data source v2 API is not stable yet and we may even change the abstraction of the APIs. The design of custom metrics may affect the design of the streaming source APIs. I had a hard time to figure out the life cycle of custom metrics. It seems like its life cycle should be bound to an epoch, but unfortunately we don't have such an interface in continuous streaming to represent an epoch. Is it possible that we may end up with 2 sets of custom metrics APIs for micro-batch and continuous? The documentation added in this PR is not clear about this. |
I actually thought those all of them are part of DataSource V2. Why are we fine with changing those interfaces but not okay with this one and we consider reverting it? Other things should be clarified if there are some concerns, yea of course. In this case, switching it to |
Stuff like this merits api discussions. Not just implementation changes ... |
I'm confused by this api. Is this for streaming only? If yes, why are they not in the stream package? If not, I only found streaming implementation. Maybe I missed it. |
@rxin its for streaming sources and sinks as explained in the doc It had to be shared between classes in reader.streaming and writer.streaming, so was added in the parent package (similar to other streaming specific classes that exists here like StreamingWriteSupportProvider.java we could move all of it to a streaming package. |
@arunmahadevan, feel free to pick up the commits in my PR to your followup if they have to be changed. I will close mine. |
I created a follow up PR to move CustomMetrics (and a few other streaming specific interfaces in that package) to 'streaming' and mark the interfaces as Unstable here - #22299 |
My 2 cents, the root reason is the lifecycle of reporting query progress is tied to I'm not aware of how/when updated information of nodes of physical plan are transmitted from executor to the driver (slowly expanding my knowledge and keeping contributions I can do with such understanding), but we should avoid using executed plan as a source to read information, and find alternative to be compatible between micro-batch and continuous mode. It doesn't apply only metrics but also watermarks. I'm not sure it is viable, but It could be via RPC or whatever once we can aggregate the information from driver. Then each operators can send information on driver directly and driver can aggregate them and utilize once a batch or an epoch is finished. |
@cloud-fan we could still report progress at the end of each epoch (e.g. here and via the EpochCordinator). There need not be separate interfaces for the progress or the custom metrics, just the mechanisms could be different. |
I skimmed about how AccumulatorV2 works, and looks like the values in a task are reported along with CompletionEvent which is triggered when a task ends. Then in continuous mode driver even doesn't have updated metrics. It should not couple with lifecycle of task. |
Can someone write a design doc for the metrics support? I think this is an important feature for data source v2 and we need to be careful here. The design doc should explain how custom metrics fit in the abstraction of data source v2 API, how the metrics API would look like for batch, micro-batch and continuous (I feel metrics is also important for batch sources), and how the sources report metrics physically (via task complete event? via heartbeat? via RPC?). @rxin just sent an email to the dev list about the data source v2 API abstraction, it would be great if you guys can kick it and talk about the metrics support. It's very likely that the custom metrics API would be replaced by something totally different after we finish the design. I don't think we should rush into something that works but not well designed. |
So .. @cloud-fan, and @rxin, how about this:
|
I will take a look at this tomorrow, since I’m already looking at data
source apis myself. Can provide opinion after another look on whether we
should keep it unstable or revert.
|
Thank you @rxin for your time and efforts. |
NOTE: This is not for dealing with custom metrics. This is for dealing with metrics which should also work for continuous mode. SPARK-23887 is the right place to discuss but once we are talking about metrics here we could just talk here. Please let me know if it is not convenient to talk about SPARK-23887 from here: we can either comment on SPARK-23887 or initiate a new thread in dev mailing list. If batch query also leverages AccumulatorV2 for metrics, IMHO it might not need to redesign metrics API start from scratch. For batch and micro-batch the metrics API work without any concerns (it is getting requests for improvement though), and for continuous mode the metrics just don't work because task never finishes. The change in metrics affects both query status as well as SQL tab in UI. I haven't concerned too deeply with metrics on continuous mode so not sure about current state of UI and the ideal shape of UI, so will spend time to play with. My 2 cents, once we have existing metrics work well, we could find out some ways to let current metrics work well with continuous mode, to not break other things as well. One thing I would like to ask to ourselves is, would we treat epoch id as batch id? For checkpointing we already did it, and in some streaming framework they represent |
I spent more hours to take a look at how SQL UI can update the metrics information before task ends, and now I guess I may understand what was the concern from @cloud-fan here. This is different from how we allow custom metrics in StateStore. Every SQL metrics even custom metrics in StateStore are accumulators, which are taken care of executor heartbeat (Honestly I didn't notice it. My bad) and UI updates these information. Custom metrics in StateStore is only updated when state operation is going to be finished for each partition, but they are exposed to SQL UI anyway and gets updated dynamically in the UI (I meant the values can be updated even for running batch). With StreamingQueryProgress, we are also exposing information which are only calculated when they're needed, and now it is when finishTrigger is called, so mostly batch ends. Custom metrics in this patch placed here: they're additional information for StreamingQueryProgress, hence intentional to be updated per batch. They're not actually SQL metrics, but its name would lead someone thinking why it doesn't follow SQL metrics. Maybe the name matters? So there're two desires to add custom information:
and the target of the patch is latter. We know 2 is only applied to micro-batch for now, and IMHO current StreamingQueryProgress is not suitable for continuous mode because of these reasons:
So IMHO it's unlikely that StreamingQueryProgress will not available for continuous mode (not only for custom metrics), and we may want to rely on running SQL metrics. That's actually what other streaming frameworks are providing metrics as of now, but they are also showing these metrics as aggregated values in time window or even time-series. Spark doesn't need to have such feature for batch and micro-batch, but in continuous mode, without that feature these SQL metrics will be really hard to see after long run (say 1 month). That's the hard thing when we want to make modes being transparent: the requirements of metrics for batch/micro-batch and continuous mode are just different, and metrics may not be the only issue. |
Given the uncertainty about how this works across batch, streaming, and CP, and given we are still flushing out the main APIs, I think we should revert this, and revisit when the main APIs are done. In general for API design, it is best to flush out the big skeletons first, and then work on filling the gaps. Think about building a house. You build the frame, and put the stud in, the walls, and then do the final finish. You don't start by putting faucets in one room when you are still moving the main building structure. |
BTW I think this is probably SPIP-worthy. At the very least we should write a design doc on this, similar to the other docs for dsv2 sub-components. We should really think about whether it'd be possible to unify the three modes (batch, microbatch streaming, CP). |
## What changes were proposed in this pull request? Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out. ## How was this patch tested? Jenkins Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Is there a JIRA which closure would indicate DataSource API V2 is done and this PR would then go futher? Is that SPARK-25531 or SPARK-22386? I am asking as my PR #22143 depends on this PR. |
What changes were proposed in this pull request?
Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source).
Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further.
How was this patch tested?
New and existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.