diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala index 92b13f2b555d1..868edb5dcdc0c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala @@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization */ private object JsonUtils { private implicit val formats = Serialization.formats(NoTypeHints) - implicit val ordering = new Ordering[TopicPartition] { - override def compare(x: TopicPartition, y: TopicPartition): Int = { - Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) - } - } /** * Read TopicPartitions from json string @@ -56,7 +51,7 @@ private object JsonUtils { * Write TopicPartitions as json string */ def partitions(partitions: Iterable[TopicPartition]): String = { - val result = HashMap.empty[String, List[Int]] + val result = new HashMap[String, List[Int]] partitions.foreach { tp => val parts: List[Int] = result.getOrElse(tp.topic, Nil) result += tp.topic -> (tp.partition::parts) @@ -85,31 +80,19 @@ private object JsonUtils { * Write per-TopicPartition offsets as json string */ def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = { - val result = HashMap.empty[String, HashMap[Int, Long]] + val result = new HashMap[String, HashMap[Int, Long]]() + implicit val ordering = new Ordering[TopicPartition] { + override def compare(x: TopicPartition, y: TopicPartition): Int = { + Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition)) + } + } val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism partitions.foreach { tp => val off = partitionOffsets(tp) - val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) + val parts = result.getOrElse(tp.topic, new HashMap[Int, Long]) parts += tp.partition -> off result += tp.topic -> parts } Serialization.write(result) } - - /** - * Write per-topic partition lag as json string - */ - def partitionLags( - latestOffsets: Map[TopicPartition, Long], - processedOffsets: Map[TopicPartition, Long]): String = { - val result = HashMap.empty[String, HashMap[Int, Long]] - val partitions = latestOffsets.keySet.toSeq.sorted - partitions.foreach { tp => - val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L) - val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long]) - parts += tp.partition -> lag - result += tp.topic -> parts - } - Serialization.write(Map("lag" -> result)) - } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index 70f37e32e78db..bb4de674c3c72 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -22,7 +22,6 @@ import java.io._ import java.nio.charset.StandardCharsets import org.apache.commons.io.IOUtils -import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder} import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions} +import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset} import org.apache.spark.sql.types.StructType import org.apache.spark.util.UninterruptibleThread @@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( options: DataSourceOptions, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) - extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging { + failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging { private val pollTimeoutMs = options.getLong( "kafkaConsumer.pollTimeoutMs", @@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport( KafkaMicroBatchReaderFactory } - // TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as - // a parameter. - override def getCustomMetrics(): CustomMetrics = { - KafkaCustomMetrics( - kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets) - } - override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } @@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( } } } - -/** - * Currently reports per topic-partition lag. - * This is the difference between the offset of the latest available data - * in a topic-partition and the latest offset that has been processed. - */ -private[kafka010] case class KafkaCustomMetrics( - latestOffsets: Map[TopicPartition, Long], - processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics { - override def json(): String = { - JsonUtils.partitionLags(latestOffsets, processedOffsets) - } - - override def toString: String = json() -} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 78249f7a80fb5..8e246dbbf5d70 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -30,8 +30,6 @@ import scala.util.Random import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition -import org.json4s.DefaultFormats -import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } } - test("custom lag metrics") { - import testImplicits._ - val topic = newTopic() - testUtils.createTopic(topic, partitions = 2) - testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray) - require(testUtils.getLatestOffsets(Set(topic)).size === 2) - - val kafka = spark - .readStream - .format("kafka") - .option("subscribe", topic) - .option("startingOffsets", s"earliest") - .option("maxOffsetsPerTrigger", 10) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - - implicit val formats = DefaultFormats - - val mapped = kafka.map(kv => kv._2.toInt + 1) - testStream(mapped)( - StartStream(trigger = OneTimeTrigger), - AssertOnQuery { query => - query.awaitTermination() - val source = query.lastProgress.sources(0) - // masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each - // so 5 events should be processed from each partition and a lag of 45 events - val custom = parse(source.customMetrics) - .extract[Map[String, Map[String, Map[String, Long]]]] - custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45 - } - ) - } - } abstract class KafkaSourceSuiteBase extends KafkaSourceTest { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java deleted file mode 100644 index 7011a70e515e2..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An interface for reporting custom metrics from streaming sources and sinks - */ -@InterfaceStability.Evolving -public interface CustomMetrics { - /** - * Returns a JSON serialized representation of custom metrics - * - * @return JSON serialized representation of custom metrics - */ - String json(); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java deleted file mode 100644 index 8693154cb7045..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.CustomMetrics; - -/** - * A mix in interface for {@link StreamingReadSupport}. Data sources can implement this interface - * to report custom metrics that gets reported under the - * {@link org.apache.spark.sql.streaming.SourceProgress} - */ -@InterfaceStability.Evolving -public interface SupportsCustomReaderMetrics extends StreamingReadSupport { - - /** - * Returns custom metrics specific to this data source. - */ - CustomMetrics getCustomMetrics(); - - /** - * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid - * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that - * your custom metrics work right and correct values are reported always. The default action - * on invalid metrics is to ignore it. - * - * @param ex the exception - */ - default void onInvalidMetrics(Exception ex) { - // default is to ignore invalid custom metrics - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java deleted file mode 100644 index 2b018c7d123bb..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.writer.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.CustomMetrics; - -/** - * A mix in interface for {@link StreamingWriteSupport}. Data sources can implement this interface - * to report custom metrics that gets reported under the - * {@link org.apache.spark.sql.streaming.SinkProgress} - */ -@InterfaceStability.Evolving -public interface SupportsCustomWriterMetrics extends StreamingWriteSupport { - - /** - * Returns custom metrics specific to this data source. - */ - CustomMetrics getCustomMetrics(); - - /** - * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid - * (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that - * your custom metrics work right and correct values are reported always. The default action - * on invalid metrics is to ignore it. - * - * @param ex the exception - */ - default void onInvalidMetrics(Exception ex) { - // default is to ignore invalid custom metrics - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 417b6b39366ae..d4b50655c7215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -22,20 +22,14 @@ import java.util.{Date, UUID} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.util.control.NonFatal - -import org.json4s.jackson.JsonMethods.parse import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport -import org.apache.spark.sql.sources.v2.CustomMetrics -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics} -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent import org.apache.spark.util.Clock @@ -162,31 +156,7 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") - // extracts and validates custom metrics from readers and writers - def extractMetrics( - getMetrics: () => Option[CustomMetrics], - onInvalidMetrics: (Exception) => Unit): Option[String] = { - try { - getMetrics().map(m => { - val json = m.json() - parse(json) - json - }) - } catch { - case ex: Exception if NonFatal(ex) => - onInvalidMetrics(ex) - None - } - } - val sourceProgress = sources.distinct.map { source => - val customReaderMetrics = source match { - case s: SupportsCustomReaderMetrics => - extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics) - - case _ => None - } - val numRecords = executionStats.inputRows.getOrElse(source, 0L) new SourceProgress( description = source.toString, @@ -194,19 +164,11 @@ trait ProgressReporter extends Logging { endOffset = currentTriggerEndOffsets.get(source).orNull, numInputRows = numRecords, inputRowsPerSecond = numRecords / inputTimeSec, - processedRowsPerSecond = numRecords / processingTimeSec, - customReaderMetrics.orNull + processedRowsPerSecond = numRecords / processingTimeSec ) } - val customWriterMetrics = extractWriteSupport() match { - case Some(s: SupportsCustomWriterMetrics) => - extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics) - - case _ => None - } - - val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull) + val sinkProgress = new SinkProgress(sink.toString) val newProgress = new StreamingQueryProgress( id = id, @@ -235,18 +197,6 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } - /** Extract writer from the executed query plan. */ - private def extractWriteSupport(): Option[StreamingWriteSupport] = { - if (lastExecution == null) return None - lastExecution.executedPlan.collect { - case p if p.isInstanceOf[WriteToDataSourceV2Exec] => - p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport - }.headOption match { - case Some(w: MicroBatchWritSupport) => Some(w.writeSupport) - case _ => None - } - } - /** Extract statistics about stateful operators from the executed query plan. */ private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { if (lastExecution == null) return Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 2509450f0da9d..c50dc7bcb8da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -23,9 +23,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization - import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -35,9 +32,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} -import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport, SupportsCustomWriterMetrics} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -119,26 +116,15 @@ class MemorySinkV2 extends DataSourceV2 with StreamingWriteSupportProvider batches.clear() } - def numRows: Int = synchronized { - batches.foldLeft(0)(_ + _.data.length) - } - override def toString(): String = "MemorySinkV2" } case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} -class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics { - private implicit val formats = Serialization.formats(NoTypeHints) - override def json(): String = Serialization.write(Map("numRows" -> sink.numRows)) -} - class MemoryStreamingWriteSupport( val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType) - extends StreamingWriteSupport with SupportsCustomWriterMetrics { - - private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink) + extends StreamingWriteSupport { override def createStreamingWriterFactory: MemoryWriterFactory = { MemoryWriterFactory(outputMode, schema) @@ -154,8 +140,6 @@ class MemoryStreamingWriteSupport( override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { // Don't accept any of the new input. } - - override def getCustomMetrics: CustomMetrics = customMemoryV2Metrics } case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index cf9375d39b39d..f2173aa1e59c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -172,27 +172,7 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double, - val customMetrics: String) extends Serializable { - - /** SourceProgress without custom metrics. */ - protected[sql] def this( - description: String, - startOffset: String, - endOffset: String, - numInputRows: Long, - inputRowsPerSecond: Double, - processedRowsPerSecond: Double) { - - this( - description, - startOffset, - endOffset, - numInputRows, - inputRowsPerSecond, - processedRowsPerSecond, - null) - } + val processedRowsPerSecond: Double) extends Serializable { /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) @@ -207,18 +187,12 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } - val jsonVal = ("description" -> JString(description)) ~ + ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) - - if (customMetrics != null) { - jsonVal ~ ("customMetrics" -> parse(customMetrics)) - } else { - jsonVal - } } private def tryParse(json: String) = try { @@ -237,13 +211,7 @@ class SourceProgress protected[sql]( */ @InterfaceStability.Evolving class SinkProgress protected[sql]( - val description: String, - val customMetrics: String) extends Serializable { - - /** SinkProgress without custom metrics. */ - protected[sql] def this(description: String) { - this(description, null) - } + val description: String) extends Serializable { /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) @@ -254,12 +222,6 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { - val jsonVal = ("description" -> JString(description)) - - if (customMetrics != null) { - jsonVal ~ ("customMetrics" -> parse(customMetrics)) - } else { - jsonVal - } + ("description" -> JString(description)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index 50f13bee251ea..61857365ac989 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -63,25 +63,4 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) } - - test("writer metrics") { - val sink = new MemorySinkV2 - val schema = new StructType().add("i", "int") - val writeSupport = new MemoryStreamingWriteSupport( - sink, OutputMode.Append(), schema) - // batch 0 - writeSupport.commit(0, - Array( - MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), - MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), - MemoryWriterCommitMessage(2, Seq(Row(5), Row(6))) - )) - assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":6}") - // batch 1 - writeSupport.commit(1, - Array( - MemoryWriterCommitMessage(0, Seq(Row(7), Row(8))) - )) - assert(writeSupport.getCustomMetrics.json() == "{\"numRows\":8}") - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 73592526fb0f7..1dd817545a969 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -22,8 +22,6 @@ import java.util.concurrent.CountDownLatch import scala.collection.mutable import org.apache.commons.lang3.RandomStringUtils -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -457,31 +455,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } - test("Check if custom metrics are reported") { - val streamInput = MemoryStream[Int] - implicit val formats = Serialization.formats(NoTypeHints) - testStream(streamInput.toDF(), useV2Sink = true)( - AddData(streamInput, 1, 2, 3), - CheckAnswer(1, 2, 3), - AssertOnQuery { q => - val lastProgress = getLastProgressWithData(q) - assert(lastProgress.nonEmpty) - assert(lastProgress.get.numInputRows == 3) - assert(lastProgress.get.sink.customMetrics == "{\"numRows\":3}") - true - }, - AddData(streamInput, 4, 5, 6, 7), - CheckAnswer(1, 2, 3, 4, 5, 6, 7), - AssertOnQuery { q => - val lastProgress = getLastProgressWithData(q) - assert(lastProgress.nonEmpty) - assert(lastProgress.get.numInputRows == 4) - assert(lastProgress.get.sink.customMetrics == "{\"numRows\":7}") - true - } - ) - } - test("input row calculation with same V1 source used twice in self-join") { val streamingTriggerDF = spark.createDataset(1 to 10).toDF val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")