Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress #21721

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;

/**
* An interface for reporting custom metrics from streaming sources and sinks
*/
@InterfaceStability.Evolving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we switch this to Unstable instead for now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #22296 in case we want.

public interface CustomMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java side should also be 2 spaced indented (see "Code Style Guide" in https://spark.apache.org/contributing.html)

/**
* Returns a JSON serialized representation of custom metrics
*
* @return JSON serialized representation of custom metrics
*/
String json();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.CustomMetrics;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;

/**
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report custom metrics that gets reported under the
* {@link org.apache.spark.sql.streaming.SourceProgress}
*
*/
@InterfaceStability.Evolving
public interface SupportsCustomReaderMetrics extends DataSourceReader {
/**
* Returns custom metrics specific to this data source.
*/
CustomMetrics getCustomMetrics();

/**
* Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
* (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
* your custom metrics work right and correct values are reported always. The default action
* on invalid metrics is to ignore it.
*
* @param ex the exception
*/
default void onInvalidMetrics(Exception ex) {
// default is to ignore invalid custom metrics
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.writer.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.CustomMetrics;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;

/**
* A mix in interface for {@link DataSourceWriter}. Data source writers can implement this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we intend creating a new interface as mix-in, we may not need to create individual interfaces for each DataSourceReader and DataSourceWriter. We could have only one interface and let DataSourceReader and DataSourceWriter add such mix-in interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was to restrict the mixin so that it can be applied only to DataSourceReader and DataSourceWriter (similar pattern followed in other mixins) by inheriting the appropriate types. Unfortunately theres no common ancestor for the mixin to inherit from so I had to duplicate the interface. Agree that its not ideal.

A few options:

  1. Have a common ancestor marker interface (say DataSourceComponent) which is the super type of DataSourceReader and DataSourceWriter. Then we can have a single mixin that is a subtype of that interface. We may encounter similar usages for other mixins in future.
  2. The mixin does not inherit anything (neither DataSourceReader nor DataSourceWriter). Here we cannot impose a restriction on the type of classes the mixin can be applied to.
  3. Duplicate interfaces (the proposed option in the patch).

I prefer option 1, but would like to proceed based on the feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK got your intention. I think it makes sense. I'm OK with all three options and personally prefer 1 or 2 if the intention is to mix-in, but let's see committers' feedback on this.

* interface to report custom metrics that gets reported under the
* {@link org.apache.spark.sql.streaming.SinkProgress}
*
*/
@InterfaceStability.Evolving
public interface SupportsCustomWriterMetrics extends DataSourceWriter {
/**
* Returns custom metrics specific to this data source.
*/
CustomMetrics getCustomMetrics();

/**
* Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
* (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
* your custom metrics work right and correct values are reported always. The default action
* on invalid metrics is to ignore it.
*
* @param ex the exception
*/
default void onInvalidMetrics(Exception ex) {
// default is to ignore invalid custom metrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ import java.util.{Date, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter
import org.apache.spark.sql.sources.v2.CustomMetrics
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsCustomWriterMetrics
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
Expand Down Expand Up @@ -156,18 +164,51 @@ trait ProgressReporter extends Logging {
}
logDebug(s"Execution stats: $executionStats")

// extracts and validates custom metrics from readers and writers
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[String] = {
try {
getMetrics().map(m => {
val json = m.json()
parse(json)
json
})
} catch {
case ex: Exception if NonFatal(ex) =>
onInvalidMetrics(ex)
None
}
}

val sourceProgress = sources.distinct.map { source =>
val customReaderMetrics = source match {
case s: SupportsCustomReaderMetrics =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val numRecords = executionStats.inputRows.getOrElse(source, 0L)
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec
processedRowsPerSecond = numRecords / processingTimeSec,
customReaderMetrics.orNull
)
}
val sinkProgress = new SinkProgress(sink.toString)

val customWriterMetrics = dataSourceWriter match {
case Some(s: SupportsCustomWriterMetrics) =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)

val newProgress = new StreamingQueryProgress(
id = id,
Expand Down Expand Up @@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}

/** Extract writer from the executed query plan. */
private def dataSourceWriter: Option[DataSourceWriter] = {
if (lastExecution == null) return None
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only works for microbatch mode, do we have a plan to support continuous mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, currently the progress is reported only for micro-batch mode. This should be supported for continuous mode as well when we start reporting progress, but needs some more work - https://issues.apache.org/jira/browse/SPARK-23887

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, thanks!

p.asInstanceOf[WriteToDataSourceV2Exec].writer
}.headOption match {
case Some(w: MicroBatchWriter) => Some(w.writer)
case _ => None
}
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
* the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
* streaming writer.
*/
class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceWriter {
class MicroBatchWriter(batchId: Long, val writer: StreamWriter) extends DataSourceWriter {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
writer.commit(batchId, messages)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -32,9 +35,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -114,14 +117,25 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
batches.clear()
}

def numRows: Int = synchronized {
batches.foldLeft(0)(_ + _.data.length)
}

override def toString(): String = "MemorySinkV2"
}

case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
extends WriterCommitMessage {}

class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
private implicit val formats = Serialization.formats(NoTypeHints)
override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
}

class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, schema: StructType)
extends DataSourceWriter with Logging {
extends DataSourceWriter with SupportsCustomWriterMetrics with Logging {

private val memoryV2CustomMetrics = new MemoryV2CustomMetrics(sink)

override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode, schema)

Expand All @@ -135,10 +149,16 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, sc
override def abort(messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}

override def getCustomMetrics: CustomMetrics = {
memoryV2CustomMetrics
}
}

class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
extends StreamWriter {
extends StreamWriter with SupportsCustomWriterMetrics {

private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink)

override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode, schema)

Expand All @@ -152,6 +172,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode, schema:
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}

override def getCustomMetrics: CustomMetrics = {
customMemoryV2Metrics
}
}

case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,27 @@ class SourceProgress protected[sql](
val endOffset: String,
val numInputRows: Long,
val inputRowsPerSecond: Double,
val processedRowsPerSecond: Double) extends Serializable {
val processedRowsPerSecond: Double,
val customMetrics: String) extends Serializable {

/** SourceProgress without custom metrics. */
protected[sql] def this(
description: String,
startOffset: String,
endOffset: String,
numInputRows: Long,
inputRowsPerSecond: Double,
processedRowsPerSecond: Double) {

this(
description,
startOffset,
endOffset,
numInputRows,
inputRowsPerSecond,
processedRowsPerSecond,
null)
}

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
Expand All @@ -178,12 +198,18 @@ class SourceProgress protected[sql](
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
}

("description" -> JString(description)) ~
val jsonVal = ("description" -> JString(description)) ~
("startOffset" -> tryParse(startOffset)) ~
("endOffset" -> tryParse(endOffset)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))

if (customMetrics != null) {
jsonVal ~ ("customMetrics" -> parse(customMetrics))
} else {
jsonVal
}
}

private def tryParse(json: String) = try {
Expand All @@ -202,7 +228,13 @@ class SourceProgress protected[sql](
*/
@InterfaceStability.Evolving
class SinkProgress protected[sql](
val description: String) extends Serializable {
val description: String,
val customMetrics: String) extends Serializable {

/** SinkProgress without custom metrics. */
protected[sql] def this(description: String) {
this(description, null)
}

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
Expand All @@ -213,6 +245,12 @@ class SinkProgress protected[sql](
override def toString: String = prettyJson

private[sql] def jsonValue: JValue = {
("description" -> JString(description))
val jsonVal = ("description" -> JString(description))

if (customMetrics != null) {
jsonVal ~ ("customMetrics" -> parse(customMetrics))
} else {
jsonVal
}
}
}
Loading