-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22912] v2 data source support in MicroBatchExecution #20097
Conversation
/cc @zsxwing |
Test build #85445 has finished for PR 20097 at commit
|
Test build #85478 has finished for PR 20097 at commit
|
Test build #85491 has finished for PR 20097 at commit
|
retest this please |
Test build #85525 has finished for PR 20097 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks quite good, just a few minor comments.
@@ -35,6 +35,12 @@ case class DataSourceV2Relation( | |||
} | |||
} | |||
|
|||
class StreamingDataSourceV2Relation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs. What is the purpose of this class?
@@ -68,8 +76,20 @@ class MicroBatchExecution( | |||
// "df.logicalPlan" has already used attributes of the previous `output`. | |||
StreamingExecutionRelation(source, output)(sparkSession) | |||
}) | |||
case s @ StreamingRelationV2(v2DataSource, _, _, output, v1DataSource) | |||
if !v2DataSource.isInstanceOf[MicroBatchReadSupport] => | |||
case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can you add a comment above this section of the code to explain what this is doing. took me a while to remember the context.
options) | ||
// Generate the V1 node to catch errors thrown within generation. | ||
try { | ||
StreamingRelation(v1DataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This line seems to be used in all 3 cases. can be deduped.
in fact, its confusing to have a single line like with no variable, just to enforce the side effects of StreamingRelation.apply().
case e: UnsupportedOperationException | ||
if e.getMessage.contains("does not support streamed reading") => | ||
// If v1 wasn't supported for this source, that's fine; just proceed onwards with v2. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt this try...catch be added to the next case as well?
try { | ||
StreamingRelation(v1DataSource) | ||
} catch { | ||
case e: UnsupportedOperationException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you link to the exception that this supposed to throw? do we really have a check the message string to match? seems pretty brittle for something this crucial item of checking whether something is supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
Line 266 in 9a2b65a
throw new UnsupportedOperationException( |
I agree that it would be nice to change this exception, but I don't know whether we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On reflection, there's actually a better way to do this which does not need to use exceptions as control flow. I didn't notice before because lookupDataSource returns Class[_] for some reason.
|
||
class RateStreamV2Reader(options: DataSourceV2Options) | ||
extends MicroBatchReader { | ||
implicit val defaultFormats: DefaultFormats = DefaultFormats | ||
|
||
val clock = new SystemClock | ||
val clock = if (options.get("useManualClock").orElse("false").toBoolean) new ManualClock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put this in a { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also mention that manualClock is only used for testing so that someone looking at the source does not confuse this to be a publicly visible feature.
|
||
private val numPartitions = | ||
options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt | ||
options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("1").toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
@@ -111,7 +112,7 @@ class RateStreamV2Reader(options: DataSourceV2Options) | |||
|
|||
val packedRows = mutable.ListBuffer[(Long, Long)]() | |||
var outVal = startVal + numPartitions | |||
var outTimeMs = startTimeMs + msPerPartitionBetweenRows | |||
var outTimeMs = startTimeMs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original behavior was an off-by-one error. With 1 partition and 1 row per second, for example, every row would come timestamped 1 second after it was actually generated.
@@ -317,6 +355,8 @@ class MicroBatchExecution( | |||
if (prevBatchOff.isDefined) { | |||
prevBatchOff.get.toStreamProgress(sources).foreach { | |||
case (src: Source, off) => src.commit(off) | |||
case (reader: MicroBatchReader, off) => | |||
reader.commit(reader.deserializeOffset(off.json)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not call reader.commit(off)
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a SerializedOffset here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I am just comparing with the previous case src.commit(off)
I think till now, it was the responsibility of the Source to check whether the pass Offset instance was an instance of the custom offset type (e.g. KafkaSourceOffset) and accordingly either use it directly or deserialize it. Avoids unnecessary conversions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick summary:
V1 sources were silently responsible for checking every offset they receive, and deserializing it if it's a SerializedOffset.
This is awkward, so SerializedOffset isn't being migrated to the v2 API. For v2 sources, all Offset instances passed to a reader will have the right type in the first place, and the execution engine will deserialize them from JSON form with the deserializeOffset handle. In the long term, the conversion to SerializedOffset can be removed entirely.
But as long as the v1 path is around, we can't (without a lot of pointless effort) change the offset log to not return Offset instances. So we have to pull the JSON out of the SerializedOffset and then deserialize it properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. Makes sense.
@@ -236,14 +257,31 @@ class MicroBatchExecution( | |||
val hasNewData = { | |||
awaitProgressLock.lock() | |||
try { | |||
val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { | |||
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you add more comments in this section to explain what this does? This section is becoming more complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but I'm not quite sure what to add.
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset])) | ||
logDebug(s"Retrieving data from $reader: $current -> $available") | ||
Some(reader -> | ||
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name this class MicrobatchDataSourceV2Relation to be more specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuous execution actually should be using it too, since the isStreaming bit should still be set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the ContinuousExecution use it? Seems like this class was added in this PR. Are you planning to modify the ContinuousExecution to use it in future?
@@ -226,7 +226,7 @@ trait ProgressReporter extends Logging { | |||
// 3. For each source, we sum the metrics of the associated execution plan leaves. | |||
// | |||
val logicalPlanLeafToSource = newData.flatMap { case (source, df) => | |||
df.logicalPlan.collectLeaves().map { leaf => leaf -> source } | |||
df.collectLeaves().map { leaf => leaf -> source } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename df -> logicalPlan
Test build #85641 has finished for PR 20097 at commit
|
case s: Sink => s.addBatch(currentBatchId, nextBatch) | ||
case s: MicroBatchWriteSupport => | ||
// Execute the V2 writer node in the query plan. | ||
nextBatch.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it clear in the comments that this collect() does not accumulate any data, only forces the execution of the writer.
trigger, | ||
triggerClock, | ||
outputMode, | ||
extraOptions, | ||
deleteCheckpointOnStop)) | ||
case _ => | ||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the only other option?
I am afraid that with some random situation that matched none of the cases, this error will be thrown which is misleading.
For example, what happens if the sink supports only MicroBatchWrite and ContinuousTrigger is mentioned? Shouldnt that be an error with the different error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is the only other option. MicroBatchWriteSupport and Sink will have already matched with any trigger, ContinuousWriteSupport will have already matched with a continuous trigger, and there aren't any other implementations of BaseStreamingSink.
I agree it's cleaner to check explicitly.
Test build #85643 has finished for PR 20097 at commit
|
@@ -32,13 +32,17 @@ import org.apache.spark.sql.sources.v2.DataSourceV2Options | |||
import org.apache.spark.sql.sources.v2.reader._ | |||
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} | |||
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} | |||
import org.apache.spark.util.SystemClock | |||
import org.apache.spark.util.{ManualClock, SystemClock} | |||
|
|||
class RateStreamV2Reader(options: DataSourceV2Options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename this to MicroBatchRateStreamReader, to make it consistent with ContinuousRateStreamReader?
* This is a temporary register as we build out v2 migration. Microbatch read support should | ||
* be implemented in the same register as v1. | ||
*/ | ||
class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to the file RateStreamSourceV2.scala
Test build #85646 has finished for PR 20097 at commit
|
Test build #85647 has finished for PR 20097 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more minor comments.
deleteCheckpointOnStop: Boolean) | ||
extends StreamExecution( | ||
sparkSession, name, checkpointRoot, analyzedPlan, sink, | ||
trigger, triggerClock, outputMode, deleteCheckpointOnStop) { | ||
|
||
private def toJava( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: I usually prefer moving such small less-important methods at the end of the class
// Once v1 streaming source execution is gone, we can refactor this away. | ||
// For now, we set the range here to get the source to infer the available end offset, | ||
// get that offset, and then set the range again when we later execute. | ||
s.setOffsetRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect indentation.
val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json)) | ||
reader.setOffsetRange( | ||
toJava(current), | ||
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v2.streaming.reader.Offset
is being used in a lot of places. Please rename it to OffsetV2 in the imports and use that in all places.
newAttributePlan.schema, | ||
outputMode, | ||
new DataSourceV2Options(extraOptions.asJava)) | ||
Option(writer.orElse(null)).map(WriteToDataSourceV2(_, newAttributePlan)).getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment explaining why the fallback in a LocalRelation? and when can the writer be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the writer can ever be empty. Would you prefer an assert here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writer can be empty. If the sink decides that data does not need to be written, then the returned writer can be None. See the docs of createWriter.
So just documenting that here is fine. To avoid confusion like this.
Option(writer.orElse(null)).map(WriteToDataSourceV2(_, newAttributePlan)).getOrElse { | ||
LocalRelation(newAttributePlan.schema.toAttributes, isStreaming = true) | ||
} | ||
case _ => throw new IllegalArgumentException("unknown sink type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the sink in the string as well so that its easy to debug.
deleteCheckpointOnStop: Boolean) | ||
extends StreamExecution( | ||
sparkSession, name, checkpointRoot, analyzedPlan, sink, | ||
trigger, triggerClock, outputMode, deleteCheckpointOnStop) { | ||
|
||
private def toJava( | ||
scalaOption: Option[v2.streaming.reader.Offset]): Optional[v2.streaming.reader.Offset] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mentioned elsewhere as well, import new Offset as OffsetV2 instead of using full package path.
ds match { | ||
case s: MicroBatchReadSupport => | ||
val tempReader = s.createMicroBatchReader( | ||
java.util.Optional.ofNullable(userSpecifiedSchema.orNull), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please import Optional, the full path is used in multiple places.
@@ -35,6 +35,16 @@ case class DataSourceV2Relation( | |||
} | |||
} | |||
|
|||
/** | |||
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwwise identical |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Otherwwise
?? :)
override def shortName(): String = "ratev2" | ||
} | ||
|
||
class MicroBatchRateStreamReader(options: DataSourceV2Options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with the other kafka PR, can you rename these classes to start with "RateStream"? Only if it is not too much refactoring, otherwise we can clean this up later.
case v2Sink: ContinuousWriteSupport => | ||
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) | ||
new StreamingQueryWrapper(new ContinuousExecution( | ||
case (_: MicroBatchWriteSupport, _) | (_: Sink, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt we throw error for the case MicroBatchWriteSupport
(sink does not have ContinuousWriteSupport ) and ContinuousTrigger
???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we do throw that error in the MicroBatchExecution constructor. Once all the pieces are in we'll need to refactor this a bit to get all the checking in the same place.
Test build #85730 has finished for PR 20097 at commit
|
LGTM. Merging it to master. |
Hi, @tdas . |
Yes. My bad. I didnt realize the branch had already been cut. |
## What changes were proposed in this pull request? Support for v2 data sources in microbatch streaming. ## How was this patch tested? A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing. Author: Jose Torres <[email protected]> Closes #20097 from jose-torres/v2-impl.
Thank you, @tdas ! |
case _: Sink => newAttributePlan | ||
case s: MicroBatchWriteSupport => | ||
val writer = s.createMicroBatchWriter( | ||
s"$runId", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it not be Id
. How does having runId
handle scenario of restarting stream jobs ??
What changes were proposed in this pull request?
Support for v2 data sources in microbatch streaming.
How was this patch tested?
A very basic new unit test on the toy v2 implementation of rate source. Once we have a v1 source fully migrated to v2, we'll need to do more detailed compatibility testing.