Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25336][SS]Revert SPARK-24863 and SPARK-24748 #22334

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}

/**
* Read TopicPartitions from json string
Expand All @@ -56,7 +51,7 @@ private object JsonUtils {
* Write TopicPartitions as json string
*/
def partitions(partitions: Iterable[TopicPartition]): String = {
val result = HashMap.empty[String, List[Int]]
val result = new HashMap[String, List[Int]]
partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
Expand Down Expand Up @@ -85,31 +80,19 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Serialization.write(result)
}

/**
* Write per-topic partition lag as json string
*/
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = latestOffsets.keySet.toSeq.sorted
partitions.foreach { tp =>
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> lag
result += tp.topic -> parts
}
Serialization.write(Map("lag" -> result))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.io._
import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand All @@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread

Expand All @@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
options: DataSourceOptions,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging {
failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
Expand Down Expand Up @@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport(
KafkaMicroBatchReaderFactory
}

// TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
// a parameter.
override def getCustomMetrics(): CustomMetrics = {
KafkaCustomMetrics(
kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
}

override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
Expand Down Expand Up @@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
}
}
}

/**
* Currently reports per topic-partition lag.
* This is the difference between the offset of the latest available data
* in a topic-partition and the latest offset that has been processed.
*/
private[kafka010] case class KafkaCustomMetrics(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
override def json(): String = {
JsonUtils.partitionLags(latestOffsets, processedOffsets)
}

override def toString: String = json()
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import scala.util.Random

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}

test("custom lag metrics") {
import testImplicits._
val topic = newTopic()
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
require(testUtils.getLatestOffsets(Set(topic)).size === 2)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("startingOffsets", s"earliest")
.option("maxOffsetsPerTrigger", 10)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

implicit val formats = DefaultFormats

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = OneTimeTrigger),
AssertOnQuery { query =>
query.awaitTermination()
val source = query.lastProgress.sources(0)
// masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
// so 5 events should be processed from each partition and a lag of 45 events
val custom = parse(source.customMetrics)
.extract[Map[String, Map[String, Map[String, Long]]]]
custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
}
)
}

}

abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@ import java.util.{Date, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
import org.apache.spark.sql.sources.v2.CustomMetrics
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
Expand Down Expand Up @@ -162,51 +156,19 @@ trait ProgressReporter extends Logging {
}
logDebug(s"Execution stats: $executionStats")

// extracts and validates custom metrics from readers and writers
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[String] = {
try {
getMetrics().map(m => {
val json = m.json()
parse(json)
json
})
} catch {
case ex: Exception if NonFatal(ex) =>
onInvalidMetrics(ex)
None
}
}

val sourceProgress = sources.distinct.map { source =>
val customReaderMetrics = source match {
case s: SupportsCustomReaderMetrics =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val numRecords = executionStats.inputRows.getOrElse(source, 0L)
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec,
customReaderMetrics.orNull
processedRowsPerSecond = numRecords / processingTimeSec
)
}

val customWriterMetrics = extractWriteSupport() match {
case Some(s: SupportsCustomWriterMetrics) =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
val sinkProgress = new SinkProgress(sink.toString)

val newProgress = new StreamingQueryProgress(
id = id,
Expand Down Expand Up @@ -235,18 +197,6 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}

/** Extract writer from the executed query plan. */
private def extractWriteSupport(): Option[StreamingWriteSupport] = {
if (lastExecution == null) return None
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport
}.headOption match {
case Some(w: MicroBatchWritSupport) => Some(w.writeSupport)
case _ => None
}
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
Expand Down
Loading