Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Revert SPARK-24863 (#21819) and SPARK-24748 (#21721) as per discussion in #21721. We will revisit them when the data source v2 APIs are out.

## How was this patch tested?

Jenkins

Closes #22334 from zsxwing/revert-SPARK-24863-SPARK-24748.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
zsxwing authored and cloud-fan committed Sep 5, 2018
1 parent ca861fe commit 2119e51
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 379 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import org.json4s.jackson.Serialization
*/
private object JsonUtils {
private implicit val formats = Serialization.formats(NoTypeHints)
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}

/**
* Read TopicPartitions from json string
Expand All @@ -56,7 +51,7 @@ private object JsonUtils {
* Write TopicPartitions as json string
*/
def partitions(partitions: Iterable[TopicPartition]): String = {
val result = HashMap.empty[String, List[Int]]
val result = new HashMap[String, List[Int]]
partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
Expand Down Expand Up @@ -85,31 +80,19 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val result = new HashMap[String, HashMap[Int, Long]]()
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Serialization.write(result)
}

/**
* Write per-topic partition lag as json string
*/
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = latestOffsets.keySet.toSeq.sorted
partitions.foreach { tp =>
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> lag
result += tp.topic -> parts
}
Serialization.write(Map("lag" -> result))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.io._
import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand All @@ -33,9 +32,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread

Expand All @@ -61,8 +60,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
options: DataSourceOptions,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging {
failOnDataLoss: Boolean) extends RateControlMicroBatchReadSupport with Logging {

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
Expand Down Expand Up @@ -156,13 +154,6 @@ private[kafka010] class KafkaMicroBatchReadSupport(
KafkaMicroBatchReaderFactory
}

// TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as
// a parameter.
override def getCustomMetrics(): CustomMetrics = {
KafkaCustomMetrics(
kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets)
}

override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
Expand Down Expand Up @@ -384,18 +375,3 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
}
}
}

/**
* Currently reports per topic-partition lag.
* This is the difference between the offset of the latest available data
* in a topic-partition and the latest offset that has been processed.
*/
private[kafka010] case class KafkaCustomMetrics(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
override def json(): String = {
JsonUtils.partitionLags(latestOffsets, processedOffsets)
}

override def toString: String = json()
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import scala.util.Random

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

Expand Down Expand Up @@ -958,41 +956,6 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) }
}

test("custom lag metrics") {
import testImplicits._
val topic = newTopic()
testUtils.createTopic(topic, partitions = 2)
testUtils.sendMessages(topic, (1 to 100).map(_.toString).toArray)
require(testUtils.getLatestOffsets(Set(topic)).size === 2)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("startingOffsets", s"earliest")
.option("maxOffsetsPerTrigger", 10)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

implicit val formats = DefaultFormats

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
StartStream(trigger = OneTimeTrigger),
AssertOnQuery { query =>
query.awaitTermination()
val source = query.lastProgress.sources(0)
// masOffsetsPerTrigger is 10, and there are two partitions containing 50 events each
// so 5 events should be processed from each partition and a lag of 45 events
val custom = parse(source.customMetrics)
.extract[Map[String, Map[String, Map[String, Long]]]]
custom("lag")(topic)("0") == 45 && custom("lag")(topic)("1") == 45
}
)
}

}

abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@ import java.util.{Date, UUID}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
import org.apache.spark.sql.sources.v2.CustomMetrics
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
Expand Down Expand Up @@ -162,51 +156,19 @@ trait ProgressReporter extends Logging {
}
logDebug(s"Execution stats: $executionStats")

// extracts and validates custom metrics from readers and writers
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[String] = {
try {
getMetrics().map(m => {
val json = m.json()
parse(json)
json
})
} catch {
case ex: Exception if NonFatal(ex) =>
onInvalidMetrics(ex)
None
}
}

val sourceProgress = sources.distinct.map { source =>
val customReaderMetrics = source match {
case s: SupportsCustomReaderMetrics =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val numRecords = executionStats.inputRows.getOrElse(source, 0L)
new SourceProgress(
description = source.toString,
startOffset = currentTriggerStartOffsets.get(source).orNull,
endOffset = currentTriggerEndOffsets.get(source).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
processedRowsPerSecond = numRecords / processingTimeSec,
customReaderMetrics.orNull
processedRowsPerSecond = numRecords / processingTimeSec
)
}

val customWriterMetrics = extractWriteSupport() match {
case Some(s: SupportsCustomWriterMetrics) =>
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)

case _ => None
}

val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
val sinkProgress = new SinkProgress(sink.toString)

val newProgress = new StreamingQueryProgress(
id = id,
Expand Down Expand Up @@ -235,18 +197,6 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}

/** Extract writer from the executed query plan. */
private def extractWriteSupport(): Option[StreamingWriteSupport] = {
if (lastExecution == null) return None
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
p.asInstanceOf[WriteToDataSourceV2Exec].writeSupport
}.headOption match {
case Some(w: MicroBatchWritSupport) => Some(w.writeSupport)
case _ => None
}
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
Expand Down
Loading

0 comments on commit 2119e51

Please sign in to comment.