-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22908] Add kafka source and sink for continuous processing. #20096
Changes from 9 commits
6308690
95ff103
599d001
88b261d
1ff378b
cd778ce
de98a8b
db2dc93
df194c6
dae3a09
2574818
eac756b
9e95f63
973fc7d
9998d91
a3adf1d
71f236b
9530604
4dca800
fec5a00
65ecf85
fd555ce
24f7e1f
1662828
c66850e
7871de0
f9ad94e
be6c378
1e5c7a9
d94461a
04cfae6
94d5f50
3e5b787
34c0a90
265a118
518c36a
e8cab95
9e2a16b
cc49d2f
2155acc
b18072f
2261566
341fb20
2628bd4
3bdc5e7
2f1cc76
eafe670
f825155
9101ea6
a3aaf27
9158af2
f434c09
cd1bf24
514021c
f94b53e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.kafka010 | ||
|
||
import java.io._ | ||
import java.nio.charset.StandardCharsets | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
import org.apache.commons.io.IOUtils | ||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||
import org.apache.kafka.common.TopicPartition | ||
import org.apache.kafka.common.errors.WakeupException | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} | ||
import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} | ||
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} | ||
import org.apache.spark.sql.sources.v2.reader._ | ||
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class ContinuousKafkaReader( | ||
kafkaReader: KafkaOffsetReader, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please rename this to offsetReader or maybe offsetFetcher to distinguish this from all the Reader classes in DataSourceV2 |
||
executorKafkaParams: java.util.Map[String, Object], | ||
sourceOptions: Map[String, String], | ||
metadataPath: String, | ||
initialOffsets: KafkaOffsetRangeLimit, | ||
failOnDataLoss: Boolean) | ||
extends ContinuousReader with SupportsScanUnsafeRow with Logging { | ||
|
||
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { | ||
val mergedMap = offsets.map { | ||
case KafkaSourcePartitionOffset(p, o) => Map(p -> o) | ||
}.reduce(_ ++ _) | ||
KafkaSourceOffset(mergedMap) | ||
} | ||
|
||
private lazy val session = SparkSession.getActiveSession.get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be nice to do some code organization in this class, e.g. all public methods and vals first, then all the private methods/vals/vars. OR all vals/vars first, then public methods, then private methods. |
||
private lazy val sc = session.sparkContext | ||
|
||
private lazy val pollTimeoutMs = sourceOptions.getOrElse( | ||
"kafkaConsumer.pollTimeoutMs", | ||
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString | ||
).toLong | ||
|
||
private val maxOffsetsPerTrigger = | ||
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) | ||
|
||
/** | ||
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only | ||
* called in StreamExecutionThread. Otherwise, interrupting a thread while running | ||
* `KafkaConsumer.poll` may hang forever (KAFKA-1894). | ||
*/ | ||
private lazy val initialPartitionOffsets = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used anywhere?? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops, no |
||
val offsets = initialOffsets match { | ||
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) | ||
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) | ||
case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) | ||
} | ||
logInfo(s"Initial offsets: $offsets") | ||
offsets.partitionToOffsets | ||
} | ||
|
||
private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This functions seems to be duplicate of that in the KafkaSource. Can you dedup? Maybe move this into the KafkaOffsetReader? |
||
val result = kafkaReader.fetchSpecificOffsets(specificOffsets) | ||
specificOffsets.foreach { | ||
case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && | ||
off != KafkaOffsetRangeLimit.EARLIEST => | ||
if (result(tp) != off) { | ||
reportDataLoss( | ||
s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") | ||
} | ||
case _ => | ||
// no real way to check that beginning or end is reasonable | ||
} | ||
KafkaSourceOffset(result) | ||
} | ||
|
||
// Initialized when creating read tasks. If this diverges from the partitions at the latest | ||
// offsets, we need to reconfigure. | ||
// Exposed outside this object only for unit tests. | ||
private[sql] var knownPartitions: Set[TopicPartition] = _ | ||
|
||
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema | ||
|
||
private var offset: Offset = _ | ||
override def setOffset(start: java.util.Optional[Offset]): Unit = { | ||
offset = start.orElse { | ||
val offsets = initialOffsets match { | ||
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) | ||
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) | ||
case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) | ||
} | ||
logInfo(s"Initial offsets: $offsets") | ||
offsets | ||
} | ||
} | ||
|
||
override def getStartOffset(): Offset = offset | ||
|
||
override def deserializeOffset(json: String): Offset = { | ||
KafkaSourceOffset(JsonUtils.partitionOffsets(json)) | ||
} | ||
|
||
override def createUnsafeRowReadTasks(): java.util.List[ReadTask[UnsafeRow]] = { | ||
import scala.collection.JavaConverters._ | ||
|
||
val oldStartOffsets = KafkaSourceOffset.getPartitionOffsets(offset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: oldStartPartitionOffsets |
||
|
||
val newPartitions = | ||
kafkaReader.fetchLatestOffsets().keySet.diff(oldStartOffsets.keySet) | ||
val newPartitionOffsets = kafkaReader.fetchEarliestOffsets(newPartitions.toSeq) | ||
val startOffsets = oldStartOffsets ++ newPartitionOffsets | ||
|
||
knownPartitions = startOffsets.keySet | ||
|
||
startOffsets.toSeq.map { | ||
case (topicPartition, start) => | ||
ContinuousKafkaReadTask( | ||
topicPartition, start, executorKafkaParams, pollTimeoutMs, failOnDataLoss) | ||
.asInstanceOf[ReadTask[UnsafeRow]] | ||
}.asJava | ||
} | ||
|
||
/** Stop this source and free any resources it has allocated. */ | ||
def stop(): Unit = synchronized { | ||
kafkaReader.close() | ||
} | ||
|
||
override def commit(end: Offset): Unit = {} | ||
|
||
override def needsReconfiguration(): Boolean = { | ||
knownPartitions != null && kafkaReader.fetchLatestOffsets().keySet != knownPartitions | ||
} | ||
|
||
override def toString(): String = s"KafkaSource[$kafkaReader]" | ||
|
||
/** | ||
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. | ||
* Otherwise, just log a warning. | ||
*/ | ||
private def reportDataLoss(message: String): Unit = { | ||
if (failOnDataLoss) { | ||
throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") | ||
} else { | ||
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") | ||
} | ||
} | ||
} | ||
|
||
case class ContinuousKafkaReadTask( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename class to have Kafka at the start to make it easier to find. |
||
topicPartition: TopicPartition, | ||
start: Long, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. startOffset |
||
kafkaParams: java.util.Map[String, Object], | ||
pollTimeoutMs: Long, | ||
failOnDataLoss: Boolean) | ||
extends ReadTask[UnsafeRow] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: move this to prev line. |
||
override def createDataReader(): ContinuousKafkaDataReader = { | ||
new ContinuousKafkaDataReader(topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss) | ||
} | ||
} | ||
|
||
class ContinuousKafkaDataReader( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add docs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename class to begin name with Kafka to make it easy to find. |
||
topicPartition: TopicPartition, | ||
start: Long, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. startOffset |
||
kafkaParams: java.util.Map[String, Object], | ||
pollTimeoutMs: Long, | ||
failOnDataLoss: Boolean) | ||
extends ContinuousDataReader[UnsafeRow] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: move this to prev line. |
||
private val topic = topicPartition.topic | ||
private val kafkaPartition = topicPartition.partition | ||
private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams) | ||
|
||
private val closed = new AtomicBoolean(false) | ||
|
||
private var nextKafkaOffset = start match { | ||
case s if s >= 0 => s | ||
case KafkaOffsetRangeLimit.EARLIEST => consumer.getAvailableOffsetRange().earliest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the resolution should have been done at the time of creating the task. I think that keeps it more consistent -- once the task has been defined, everything has already been resolved and the output is deterministic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this resolution actually doesn't do anything at all, because the offsets were already resolved earlier. I think I had the resolution only here originally, and forgot to delete it because it ends up as a no-op. I'll remove it now. |
||
case _ => throw new IllegalArgumentException(s"Invalid start Kafka offset $start.") | ||
} | ||
private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ | ||
|
||
override def next(): Boolean = { | ||
var r: ConsumerRecord[Array[Byte], Array[Byte]] = null | ||
while (r == null) { | ||
r = consumer.get( | ||
nextKafkaOffset, | ||
untilOffset = Long.MaxValue, | ||
pollTimeoutMs = Long.MaxValue, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this long value. see what is used in the KafkaSourceRDD of the legacy source There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. untilOffset is MaxValue because the read isn't expected to end. pollTimeoutMs is MaxValue because it should wait forever for a new value to show up. |
||
failOnDataLoss) | ||
} | ||
nextKafkaOffset = r.offset + 1 | ||
currentRecord = r | ||
true | ||
} | ||
|
||
val sharedRow = new UnsafeRow(7) | ||
val bufferHolder = new BufferHolder(sharedRow) | ||
val rowWriter = new UnsafeRowWriter(bufferHolder, 7) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we move this up to consolidate all the vals. |
||
|
||
override def get(): UnsafeRow = { | ||
bufferHolder.reset() | ||
|
||
if (currentRecord.key == null) { | ||
rowWriter.isNullAt(0) | ||
} else { | ||
rowWriter.write(0, currentRecord.key) | ||
} | ||
rowWriter.write(1, currentRecord.value) | ||
rowWriter.write(2, UTF8String.fromString(currentRecord.topic)) | ||
rowWriter.write(3, currentRecord.partition) | ||
rowWriter.write(4, currentRecord.offset) | ||
rowWriter.write(5, | ||
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp))) | ||
rowWriter.write(6, currentRecord.timestampType.id) | ||
sharedRow.setTotalSize(bufferHolder.totalSize) | ||
sharedRow | ||
} | ||
|
||
override def getOffset(): KafkaSourcePartitionOffset = { | ||
KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) | ||
} | ||
|
||
override def close(): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: is it guaranteed that the engine will call close() no matter what happens to the task using the ContinuousReader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close() is called in a terminal finally block for the thread polling the reader, which is ended at task completion. |
||
consumer.close() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -261,6 +261,10 @@ private[kafka010] case class CachedKafkaConsumer private( | |
} | ||
} | ||
|
||
def wakeup(): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not seem to be used anywhere? |
||
consumer.wakeup() | ||
} | ||
|
||
/** Create a new consumer and reset cached states */ | ||
private def resetConsumer(): Unit = { | ||
consumer.close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs explaining params.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this and other classes such that all Kafka class start with "Kafka"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also dont forget to rename the file accordingly