-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22908] Add kafka source and sink for continuous processing. #20096
Conversation
/cc @zsxwing |
add to whitelist |
Test build #85444 has finished for PR 20096 at commit
|
Test build #85448 has finished for PR 20096 at commit
|
Test build #85455 has finished for PR 20096 at commit
|
bcaa694
to
db2dc93
Compare
Test build #85479 has finished for PR 20096 at commit
|
Test build #85536 has finished for PR 20096 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Round 1 of review. Some changes required.
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class ContinuousKafkaReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs explaining params.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this and other classes such that all Kafka class start with "Kafka"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also dont forget to rename the file accordingly
/** Companion object of the [[KafkaSourceOffset]] */ | ||
private[kafka010] object KafkaSourceOffset { | ||
|
||
def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = { | ||
def getPartitionOffsets(offset: LegacyOffset): Map[TopicPartition, Long] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use OffsetV1 or something like that to make the difference more obvious
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class ContinuousKafkaReader( | ||
kafkaReader: KafkaOffsetReader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this to offsetReader or maybe offsetFetcher to distinguish this from all the Reader classes in DataSourceV2
offsets.partitionToOffsets | ||
} | ||
|
||
private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This functions seems to be duplicate of that in the KafkaSource. Can you dedup? Maybe move this into the KafkaOffsetReader?
* called in StreamExecutionThread. Otherwise, interrupting a thread while running | ||
* `KafkaConsumer.poll` may hang forever (KAFKA-1894). | ||
*/ | ||
private lazy val initialPartitionOffsets = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, no
if (failedWrite != null) return | ||
|
||
val projectedRow = projection(row) | ||
val topic = projectedRow.getUTF8String(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this topic variable overshadows the constructor param topic. i know that this pattern was present in the KafkaWriterTask, but lets not repeat the mistakes of the past. we can fix KafkaWriterTask once we migrate that to v2.
options = extraOptions.toMap, | ||
partitionColumns = normalizedParCols.getOrElse(Nil)) | ||
val sink = trigger match { | ||
case _: ContinuousTrigger => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for the future: All the checks for compatibility of sources and sinks wrt the specified trigger should be consolidated in one location. Right now the checks are spread between this file and StreamingQueryManager.
assertAwaitThread() | ||
def notDone = { | ||
val localCommittedOffsets = committedOffsets | ||
!localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset | ||
if (sources.length <= sourceIndex) { | ||
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false? shouldnt this throw an excpetion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sources is a var which might not be populated yet. (This race condition showed up in AddKafkaData in my tests.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The race condition is present because sources
is initialized to Seq.empty and then assigned to the actual sources. You can actually initialize sources
to null, and then return notDone = false
when sources
is null. Any other mismatch should throw error. I dont like this current code which hides erroneous situations.
@@ -255,17 +255,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
} | |||
} | |||
|
|||
case _ => throw new AnalysisException(s"$cls does not support data writing.") | |||
case _ => saveToV1Source() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section got more complicated with this. Add more comments above this section on this fallback policy.
} | ||
|
||
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) | ||
if (reader == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section got more complicated with this. Add more comments above this section on this fallback policy.
Test build #85699 has finished for PR 20096 at commit
|
Test build #85876 has finished for PR 20096 at commit
|
Test build #85887 has finished for PR 20096 at commit
|
Test build #85875 has finished for PR 20096 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests look great now. Few more missing tests here and there and then we will be good to go.
"continuous-stream-test-sql-context", | ||
sparkConf.set("spark.sql.testkey", "true"))) | ||
|
||
override protected def setTopicPartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment on what this method does. It is asserting something, so does not look like it only "sets" something.
import org.apache.spark.sql.streaming.{StreamTest, Trigger} | ||
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} | ||
|
||
trait KafkaContinuousTest extends KafkaSourceTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs to explain what this class if for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also since this is used not just by the source, but also the sink, better to define this in a different file.
} | ||
} | ||
|
||
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The { }
may not be needed.
} | ||
|
||
protected def makeSureGetOffsetCalled = AssertOnQuery { q => | ||
// Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure | ||
// its "getOffset" is called before pushing any data. Otherwise, because of the race contion, | ||
// its "getOffset" is called before pushing any data. Otherwise, because of the race contOOion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling mistake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember wondering this morning why my command-O key sequence wasn't working... I guess this is where it went.
|
||
import testImplicits._ | ||
|
||
test("(de)serialization of initial offsets") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed in the common KafkaSourceSuiteBase?
) | ||
) | ||
StartStream(), | ||
StopStream) | ||
} | ||
|
||
test("cannot stop Kafka stream") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this needed in the KafkaSourcesuiteBase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have a common test verifying the basic "start a stream and then stop it" flow, to provide a clear failure in case it's just completely broken by some change.
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") | ||
.as[(String, String)] | ||
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { | ||
protected def startStream(ds: Dataset[Int]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this factoring is not needed. startStream()
is not used anywhere else other than in this test. So i dont see a point of refactoring it to define it outside the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startStream is overridden in the continuous version of this test.
@@ -0,0 +1,135 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this file to KafkaContinuousSourceSuite
} | ||
} | ||
|
||
test("streaming - write data with bad schema") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing tests for ."w/o topic field, with topic option" and "topic field and topic option".
and also test for the case when topic field is null.
Test build #85888 has finished for PR 20096 at commit
|
retest this please |
Test build #85892 has finished for PR 20096 at commit
|
Test build #85904 has finished for PR 20096 at commit
|
@@ -39,6 +39,15 @@ private[continuous] sealed trait EpochCoordinatorMessage extends Serializable | |||
*/ | |||
private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing Can you take a look at these changes in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good as long as tests pass
retest this please |
Test build #85926 has finished for PR 20096 at commit
|
retest this please |
Test build #85934 has finished for PR 20096 at commit
|
## What changes were proposed in this pull request? Add kafka source and sink for continuous processing. This involves two small changes to the execution engine: * Bring data reader close() into the normal data reader thread to avoid thread safety issues. * Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception. ## How was this patch tested? new unit tests Author: Jose Torres <[email protected]> Closes #20096 from jose-torres/continuous-kafka. (cherry picked from commit 6f7aaed) Signed-off-by: Tathagata Das <[email protected]>
@jose-torres @zsxwing @tdas as discussed, this is causing a number of build timeouts. I'm going to revert this for now to de-flake the builds and we can add it back once it's fixed. |
What changes were proposed in this pull request?
Add kafka source and sink for continuous processing. This involves two small changes to the execution engine:
How was this patch tested?
new unit tests