-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22912] v2 data source support in MicroBatchExecution #20097
Changes from 14 commits
3a9f331
2ef35b8
e27d1db
9ffb92c
c6acee7
39c5d11
a9d60f1
c714472
42ff754
9da0709
a694c50
971e7a4
0059ff3
8f3629d
8d16109
0bbff56
2867880
dd93579
465810e
4836e1a
3aa7ee2
5f0a6e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,16 @@ case class DataSourceV2Relation( | |
} | ||
} | ||
|
||
/** | ||
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwwise identical | ||
* to the non-streaming relation. | ||
*/ | ||
class StreamingDataSourceV2Relation( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs. What is the purpose of this class? |
||
fullOutput: Seq[AttributeReference], | ||
reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) { | ||
override def isStreaming: Boolean = true | ||
} | ||
|
||
object DataSourceV2Relation { | ||
def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { | ||
new DataSourceV2Relation(reader.readSchema().toAttributes, reader) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,14 +17,21 @@ | |
|
||
package org.apache.spark.sql.execution.streaming | ||
|
||
import java.util.Optional | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} | ||
|
||
import org.apache.spark.sql.{Dataset, SparkSession} | ||
import org.apache.spark.sql.catalyst.encoders.RowEncoder | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} | ||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
import org.apache.spark.sql.execution.SQLExecution | ||
import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport | ||
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} | ||
import org.apache.spark.sql.sources.v2 | ||
import org.apache.spark.sql.sources.v2.DataSourceV2Options | ||
import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, MicroBatchWriteSupport} | ||
import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader | ||
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} | ||
import org.apache.spark.util.{Clock, Utils} | ||
|
||
|
@@ -33,10 +40,11 @@ class MicroBatchExecution( | |
name: String, | ||
checkpointRoot: String, | ||
analyzedPlan: LogicalPlan, | ||
sink: Sink, | ||
sink: BaseStreamingSink, | ||
trigger: Trigger, | ||
triggerClock: Clock, | ||
outputMode: OutputMode, | ||
extraOptions: Map[String, String], | ||
deleteCheckpointOnStop: Boolean) | ||
extends StreamExecution( | ||
sparkSession, name, checkpointRoot, analyzedPlan, sink, | ||
|
@@ -57,26 +65,40 @@ class MicroBatchExecution( | |
var nextSourceId = 0L | ||
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() | ||
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]() | ||
// We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a | ||
// map as we go to ensure each identical relation gets the same StreamingExecutionRelation | ||
// object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical | ||
// plan for the data within that batch. | ||
// Note that we have to use the previous `output` as attributes in StreamingExecutionRelation, | ||
// since the existing logical plan has already used those attributes. The per-microbatch | ||
// transformation is responsible for replacing attributes with their final values. | ||
val _logicalPlan = analyzedPlan.transform { | ||
case streamingRelation@StreamingRelation(dataSource, _, output) => | ||
toExecutionRelationMap.getOrElseUpdate(streamingRelation, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
val source = dataSource.createSource(metadataPath) | ||
nextSourceId += 1 | ||
// We still need to use the previous `output` instead of `source.schema` as attributes in | ||
// "df.logicalPlan" has already used attributes of the previous `output`. | ||
StreamingExecutionRelation(source, output)(sparkSession) | ||
}) | ||
case s @ StreamingRelationV2(v2DataSource, _, _, output, v1DataSource) | ||
if !v2DataSource.isInstanceOf[MicroBatchReadSupport] => | ||
case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: can you add a comment above this section of the code to explain what this is doing. took me a while to remember the context. |
||
v2ToExecutionRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
val reader = source.createMicroBatchReader( | ||
Optional.empty(), // user specified schema | ||
metadataPath, | ||
new DataSourceV2Options(options.asJava)) | ||
nextSourceId += 1 | ||
StreamingExecutionRelation(reader, output)(sparkSession) | ||
}) | ||
case s @ StreamingRelationV2(_, _, _, output, v1Relation) => | ||
v2ToExecutionRelationMap.getOrElseUpdate(s, { | ||
// Materialize source to avoid creating it in every batch | ||
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
val source = v1DataSource.createSource(metadataPath) | ||
assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable") | ||
val source = v1Relation.get.dataSource.createSource(metadataPath) | ||
nextSourceId += 1 | ||
// We still need to use the previous `output` instead of `source.schema` as attributes in | ||
// "df.logicalPlan" has already used attributes of the previous `output`. | ||
StreamingExecutionRelation(source, output)(sparkSession) | ||
}) | ||
} | ||
|
@@ -192,7 +214,8 @@ class MicroBatchExecution( | |
source.getBatch(start, end) | ||
} | ||
case nonV1Tuple => | ||
throw new IllegalStateException(s"Unexpected V2 source in $nonV1Tuple") | ||
// The V2 API does not have the same edge case requiring getBatch to be called | ||
// here, so we do nothing here. | ||
} | ||
currentBatchId = latestCommittedBatchId + 1 | ||
committedOffsets ++= availableOffsets | ||
|
@@ -236,14 +259,32 @@ class MicroBatchExecution( | |
val hasNewData = { | ||
awaitProgressLock.lock() | ||
try { | ||
val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { | ||
// Generate a map from each unique source to the next available offset. | ||
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can you add more comments in this section to explain what this does? This section is becoming more complicated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried, but I'm not quite sure what to add. |
||
case s: Source => | ||
updateStatusMessage(s"Getting offsets from $s") | ||
reportTimeTaken("getOffset") { | ||
(s, s.getOffset) | ||
} | ||
case s: MicroBatchReader => | ||
updateStatusMessage(s"Getting offsets from $s") | ||
reportTimeTaken("getOffset") { | ||
// Once v1 streaming source execution is gone, we can restructure this to be cleaner. | ||
// For now, we set the range here to get the source to infer the available end offset, | ||
// get that offset, and then set the range again when we later execute. | ||
if (availableOffsets.get(s).isDefined) { | ||
val offsetJson = availableOffsets.get(s).get.json | ||
s.setOffsetRange( | ||
Optional.of(s.deserializeOffset(offsetJson)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code can be simplified further simplified if there is a conversion from Option to Optional.
maybe add a implicit conversion class for that. would help in other places as well when you have done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want a conversion, I'd prefer to pull in scala-java8-compat, rather than roll our own we'd inevitably have to replace. (But that module is marked experimental, so maybe that's not the right tradeoff.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we have to replace inevitably? |
||
Optional.empty()) | ||
} else { | ||
s.setOffsetRange(Optional.empty(), Optional.empty()) | ||
} | ||
|
||
(s, Some(s.getEndOffset)) | ||
} | ||
}.toMap | ||
availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) | ||
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) | ||
|
||
if (dataAvailable) { | ||
true | ||
|
@@ -317,6 +358,8 @@ class MicroBatchExecution( | |
if (prevBatchOff.isDefined) { | ||
prevBatchOff.get.toStreamProgress(sources).foreach { | ||
case (src: Source, off) => src.commit(off) | ||
case (reader: MicroBatchReader, off) => | ||
reader.commit(reader.deserializeOffset(off.json)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this a SerializedOffset here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure. I am just comparing with the previous case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quick summary: V1 sources were silently responsible for checking every offset they receive, and deserializing it if it's a SerializedOffset. This is awkward, so SerializedOffset isn't being migrated to the v2 API. For v2 sources, all Offset instances passed to a reader will have the right type in the first place, and the execution engine will deserialize them from JSON form with the deserializeOffset handle. In the long term, the conversion to SerializedOffset can be removed entirely. But as long as the v1 path is around, we can't (without a lot of pointless effort) change the offset log to not return Offset instances. So we have to pull the JSON out of the SerializedOffset and then deserialize it properly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. Makes sense. |
||
} | ||
} else { | ||
throw new IllegalStateException(s"batch $currentBatchId doesn't exist") | ||
|
@@ -357,31 +400,39 @@ class MicroBatchExecution( | |
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" + | ||
s"${batch.queryExecution.logical}") | ||
logDebug(s"Retrieving data from $source: $current -> $available") | ||
Some(source -> batch) | ||
Some(source -> batch.logicalPlan) | ||
case (reader: MicroBatchReader, available) | ||
if committedOffsets.get(reader).map(_ != available).getOrElse(true) => | ||
val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json)) | ||
reader.setOffsetRange( | ||
Optional.ofNullable(current.orNull), | ||
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
logDebug(s"Retrieving data from $reader: $current -> $available") | ||
Some(reader -> | ||
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe name this class MicrobatchDataSourceV2Relation to be more specific? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Continuous execution actually should be using it too, since the isStreaming bit should still be set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the ContinuousExecution use it? Seems like this class was added in this PR. Are you planning to modify the ContinuousExecution to use it in future? |
||
case _ => None | ||
} | ||
} | ||
|
||
// A list of attributes that will need to be updated. | ||
val replacements = new ArrayBuffer[(Attribute, Attribute)] | ||
// Replace sources in the logical plan with data that has arrived since the last batch. | ||
val withNewSources = logicalPlan transform { | ||
val newBatchesPlan = logicalPlan transform { | ||
case StreamingExecutionRelation(source, output) => | ||
newData.get(source).map { data => | ||
val newPlan = data.logicalPlan | ||
assert(output.size == newPlan.output.size, | ||
newData.get(source).map { dataPlan => | ||
assert(output.size == dataPlan.output.size, | ||
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + | ||
s"${Utils.truncatedString(newPlan.output, ",")}") | ||
replacements ++= output.zip(newPlan.output) | ||
newPlan | ||
s"${Utils.truncatedString(dataPlan.output, ",")}") | ||
replacements ++= output.zip(dataPlan.output) | ||
dataPlan | ||
}.getOrElse { | ||
LocalRelation(output, isStreaming = true) | ||
} | ||
} | ||
|
||
// Rewire the plan to use the new attributes that were returned by the source. | ||
val replacementMap = AttributeMap(replacements) | ||
val triggerLogicalPlan = withNewSources transformAllExpressions { | ||
val newAttributePlan = newBatchesPlan transformAllExpressions { | ||
case a: Attribute if replacementMap.contains(a) => | ||
replacementMap(a).withMetadata(a.metadata) | ||
case ct: CurrentTimestamp => | ||
|
@@ -392,6 +443,21 @@ class MicroBatchExecution( | |
cd.dataType, cd.timeZoneId) | ||
} | ||
|
||
val triggerLogicalPlan = sink match { | ||
case _: Sink => newAttributePlan | ||
case s: MicroBatchWriteSupport => | ||
val writer = s.createMicroBatchWriter( | ||
s"$runId", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it not be |
||
currentBatchId, | ||
newAttributePlan.schema, | ||
outputMode, | ||
new DataSourceV2Options(extraOptions.asJava)) | ||
Option(writer.orElse(null)).map(WriteToDataSourceV2(_, newAttributePlan)).getOrElse { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a comment explaining why the fallback in a LocalRelation? and when can the writer be empty. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the writer can ever be empty. Would you prefer an assert here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The writer can be empty. If the sink decides that data does not need to be written, then the returned writer can be None. See the docs of createWriter. |
||
LocalRelation(newAttributePlan.schema.toAttributes, isStreaming = true) | ||
} | ||
case _ => throw new IllegalArgumentException("unknown sink type") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add the sink in the string as well so that its easy to debug. |
||
} | ||
|
||
reportTimeTaken("queryPlanning") { | ||
lastExecution = new IncrementalExecution( | ||
sparkSessionToRunBatch, | ||
|
@@ -409,7 +475,12 @@ class MicroBatchExecution( | |
|
||
reportTimeTaken("addBatch") { | ||
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) { | ||
sink.addBatch(currentBatchId, nextBatch) | ||
sink match { | ||
case s: Sink => s.addBatch(currentBatchId, nextBatch) | ||
case s: MicroBatchWriteSupport => | ||
// This doesn't accumulate any data - it just forces execution of the microbatch writer. | ||
nextBatch.collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make it clear in the comments that this collect() does not accumulate any data, only forces the execution of the writer. |
||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,8 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamR | |
import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader | ||
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} | ||
import org.apache.spark.sql.sources.v2._ | ||
import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport | ||
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader | ||
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport} | ||
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.util.{ManualClock, SystemClock} | ||
|
||
|
@@ -236,6 +236,21 @@ class RateStreamSource( | |
s"rampUpTimeSeconds=$rampUpTimeSeconds, numPartitions=$numPartitions]" | ||
} | ||
|
||
/** | ||
* This is a temporary register as we build out v2 migration. Microbatch read support should | ||
* be implemented in the same register as v1. | ||
*/ | ||
class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this to the file RateStreamSourceV2.scala |
||
override def createMicroBatchReader( | ||
schema: Optional[StructType], | ||
checkpointLocation: String, | ||
options: DataSourceV2Options): MicroBatchReader = { | ||
new RateStreamV2Reader(options) | ||
} | ||
|
||
override def shortName(): String = "ratev2" | ||
} | ||
|
||
object RateStreamSource { | ||
|
||
/** Calculate the end value we will emit at the time `seconds`. */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,13 +32,17 @@ import org.apache.spark.sql.sources.v2.DataSourceV2Options | |
import org.apache.spark.sql.sources.v2.reader._ | ||
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} | ||
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} | ||
import org.apache.spark.util.SystemClock | ||
import org.apache.spark.util.{ManualClock, SystemClock} | ||
|
||
class RateStreamV2Reader(options: DataSourceV2Options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you rename this to MicroBatchRateStreamReader, to make it consistent with ContinuousRateStreamReader? |
||
extends MicroBatchReader { | ||
implicit val defaultFormats: DefaultFormats = DefaultFormats | ||
|
||
val clock = new SystemClock | ||
val clock = { | ||
// The option to use a manual clock is provided only for unit testing purposes. | ||
if (options.get("useManualClock").orElse("false").toBoolean) new ManualClock | ||
else new SystemClock | ||
} | ||
|
||
private val numPartitions = | ||
options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt | ||
|
@@ -111,7 +115,7 @@ class RateStreamV2Reader(options: DataSourceV2Options) | |
|
||
val packedRows = mutable.ListBuffer[(Long, Long)]() | ||
var outVal = startVal + numPartitions | ||
var outTimeMs = startTimeMs + msPerPartitionBetweenRows | ||
var outTimeMs = startTimeMs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original behavior was an off-by-one error. With 1 partition and 1 row per second, for example, every row would come timestamped 1 second after it was actually generated. |
||
while (outVal <= endVal) { | ||
packedRows.append((outTimeMs, outVal)) | ||
outVal += numPartitions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Otherwwise
?? :)