Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22908] Add kafka source and sink for continuous processing. #20096

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
6308690
basic kafka
jose-torres Dec 21, 2017
95ff103
move reader close to data reader thread in case reader isn't thread safe
jose-torres Dec 24, 2017
599d001
test + small fixes
jose-torres Dec 27, 2017
88b261d
fixes lost in cherrypick
jose-torres Dec 27, 2017
1ff378b
fix sink test to use sink
jose-torres Dec 28, 2017
cd778ce
add SharedSQLContext to avoid multiple context error in jenkins
jose-torres Dec 28, 2017
de98a8b
await termination so SparkContext doesn't leak in jenkins
jose-torres Dec 28, 2017
db2dc93
fix after rebase
jose-torres Dec 28, 2017
df194c6
fix test framework race condition
jose-torres Dec 30, 2017
dae3a09
fix failure semantics in continuous kafka writer
jose-torres Jan 4, 2018
2574818
dedup fetchAndVerify
jose-torres Jan 5, 2018
eac756b
document read task and remove unused poll timeout
jose-torres Jan 5, 2018
9e95f63
rename from start to startOffset
jose-torres Jan 5, 2018
973fc7d
document data reader
jose-torres Jan 5, 2018
9998d91
remove redundant resolution
jose-torres Jan 5, 2018
a3adf1d
consolidate vals and remove unused flag
jose-torres Jan 5, 2018
71f236b
explicit close
jose-torres Jan 5, 2018
9530604
put back error check in write
jose-torres Jan 5, 2018
4dca800
name constructor param targetTopic
jose-torres Jan 5, 2018
fec5a00
refactor class names to start with kafka
jose-torres Jan 5, 2018
65ecf85
use case object
jose-torres Jan 5, 2018
fd555ce
Merge remote-tracking branch 'apache/master' into continuous-kafka
jose-torres Jan 5, 2018
24f7e1f
dedup kafka writing
jose-torres Jan 5, 2018
1662828
document writer classes
jose-torres Jan 5, 2018
c66850e
import optional
jose-torres Jan 5, 2018
7871de0
put kafka first in test class names
jose-torres Jan 5, 2018
f9ad94e
add comments about fallback
jose-torres Jan 5, 2018
be6c378
fix test suite
jose-torres Jan 5, 2018
1e5c7a9
fix flakiness in temp checkpoint cleanup
jose-torres Jan 5, 2018
d94461a
partial test refactor
jose-torres Jan 8, 2018
04cfae6
stress test
jose-torres Jan 9, 2018
94d5f50
Merge remote-tracking branch 'apache/master' into continuous-kafka
jose-torres Jan 9, 2018
3e5b787
comment out deletion test for now
jose-torres Jan 9, 2018
34c0a90
refactor stress test
jose-torres Jan 9, 2018
265a118
refactor main tests
jose-torres Jan 9, 2018
518c36a
rm wakeup
jose-torres Jan 9, 2018
e8cab95
java.util => ju
jose-torres Jan 9, 2018
9e2a16b
fix indent
jose-torres Jan 9, 2018
cc49d2f
row writer abstract
jose-torres Jan 9, 2018
2155acc
check err early
jose-torres Jan 9, 2018
b18072f
sink tests
jose-torres Jan 9, 2018
2261566
delete topic test
jose-torres Jan 9, 2018
341fb20
v2 offset
jose-torres Jan 9, 2018
2628bd4
don't processAllAvailable for continuous execution in StreamTest - th…
jose-torres Jan 9, 2018
3bdc5e7
cleanup sink properly
jose-torres Jan 9, 2018
2f1cc76
cleanup right map
jose-torres Jan 9, 2018
eafe670
synchronously stop epoch coordinator
jose-torres Jan 9, 2018
f825155
fix semantics
jose-torres Jan 9, 2018
9101ea6
add comment
jose-torres Jan 9, 2018
a3aaf27
move ser/deser test to microbatch
jose-torres Jan 9, 2018
9158af2
rm whitespace
jose-torres Jan 9, 2018
f434c09
add sink tests
jose-torres Jan 10, 2018
cd1bf24
improve docs
jose-torres Jan 10, 2018
514021c
move files
jose-torres Jan 10, 2018
f94b53e
add header
jose-torres Jan 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* A [[ContinuousReader]] for data from kafka.
*
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
* are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
* scenarios, where some offsets after the specified initial ones can't be
* properly read.
*/
class KafkaContinuousReader(
offsetReader: KafkaOffsetReader,
kafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
initialOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends ContinuousReader with SupportsScanUnsafeRow with Logging {

private lazy val session = SparkSession.getActiveSession.get
private lazy val sc = session.sparkContext

// Initialized when creating read tasks. If this diverges from the partitions at the latest
// offsets, we need to reconfigure.
// Exposed outside this object only for unit tests.
private[sql] var knownPartitions: Set[TopicPartition] = _

override def readSchema: StructType = KafkaOffsetReader.kafkaSchema

private var offset: Offset = _
override def setOffset(start: ju.Optional[Offset]): Unit = {
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
logInfo(s"Initial offsets: $offsets")
offsets
}
}

override def getStartOffset(): Offset = offset

override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}

override def createUnsafeRowReadTasks(): ju.List[ReadTask[UnsafeRow]] = {
import scala.collection.JavaConverters._

val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)

val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)

val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
}

val startOffsets = newPartitionOffsets ++
oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
knownPartitions = startOffsets.keySet

startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousReadTask(
topicPartition, start, kafkaParams, failOnDataLoss)
.asInstanceOf[ReadTask[UnsafeRow]]
}.asJava
}

/** Stop this source and free any resources it has allocated. */
def stop(): Unit = synchronized {
offsetReader.close()
}

override def commit(end: Offset): Unit = {}

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
val mergedMap = offsets.map {
case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
}.reduce(_ ++ _)
KafkaSourceOffset(mergedMap)
}

override def needsReconfiguration(): Boolean = {
knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
}

override def toString(): String = s"KafkaSource[$offsetReader]"

/**
* If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
* Otherwise, just log a warning.
*/
private def reportDataLoss(message: String): Unit = {
if (failOnDataLoss) {
throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
} else {
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
}
}

/**
* A read task for continuous Kafka processing. This will be serialized and transformed into a
* full reader on executors.
*
* @param topicPartition The (topic, partition) pair this task is responsible for.
* @param startOffset The offset to start reading from within the partition.
* @param kafkaParams Kafka consumer params to use.
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
case class KafkaContinuousReadTask(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
failOnDataLoss: Boolean) extends ReadTask[UnsafeRow] {
override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(topicPartition, startOffset, kafkaParams, failOnDataLoss)
}
}

/**
* A per-task data reader for continuous Kafka processing.
*
* @param topicPartition The (topic, partition) pair this data reader is responsible for.
* @param startOffset The offset to start reading from within the partition.
* @param kafkaParams Kafka consumer params to use.
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
class KafkaContinuousDataReader(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
private val topic = topicPartition.topic
private val kafkaPartition = topicPartition.partition
private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams)

private val sharedRow = new UnsafeRow(7)
private val bufferHolder = new BufferHolder(sharedRow)
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)

private var nextKafkaOffset = startOffset
private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _

override def next(): Boolean = {
var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
while (r == null) {
r = consumer.get(
nextKafkaOffset,
untilOffset = Long.MaxValue,
pollTimeoutMs = Long.MaxValue,
failOnDataLoss)
}
nextKafkaOffset = r.offset + 1
currentRecord = r
true
}

override def get(): UnsafeRow = {
bufferHolder.reset()

if (currentRecord.key == null) {
rowWriter.setNullAt(0)
} else {
rowWriter.write(0, currentRecord.key)
}
rowWriter.write(1, currentRecord.value)
rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
rowWriter.write(3, currentRecord.partition)
rowWriter.write(4, currentRecord.offset)
rowWriter.write(5,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp)))
rowWriter.write(6, currentRecord.timestampType.id)
sharedRow.setTotalSize(bufferHolder.totalSize)
sharedRow
}

override def getOffset(): KafkaSourcePartitionOffset = {
KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
}

override def close(): Unit = {
consumer.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}
import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY}
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{BinaryType, StringType, StructType}

/**
* Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
* don't need to really send one.
*/
case object KafkaWriterCommitMessage extends WriterCommitMessage

/**
* A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory.
* @param topic The topic this writer is responsible for. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
class KafkaContinuousWriter(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends ContinuousWriter with SupportsWriteInternalRow {

validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)

override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory =
KafkaContinuousWriterFactory(topic, producerParams, schema)

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
}

/**
* A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate
* the per-task data writers.
* @param topic The topic that should be written to. If None, topic will be inferred from
* a `topic` field in the incoming data.
* @param producerParams Parameters for Kafka producers in each task.
* @param schema The schema of the input data.
*/
case class KafkaContinuousWriterFactory(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes)
}
}

/**
* A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to
* process incoming rows.
*
* @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred
* from a `topic` field in the incoming data.
* @param producerParams Parameters to use for the Kafka producer.
* @param inputSchema The attributes in the input data.
*/
class KafkaContinuousDataWriter(
targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
import scala.collection.JavaConverters._

private lazy val producer = CachedKafkaProducer.getOrCreate(
new java.util.HashMap[String, Object](producerParams.asJava))

def write(row: InternalRow): Unit = {
checkForErrors()
sendRow(row, producer)
}

def commit(): WriterCommitMessage = {
// Send is asynchronous, but we can't commit until all rows are actually in Kafka.
// This requires flushing and then checking that no callbacks produced errors.
// We also check for errors before to fail as soon as possible - the check is cheap.
checkForErrors()
producer.flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe check for errors first before doing a flush and flush can take a long time and its better to fail earlier if possible.

checkForErrors()
KafkaWriterCommitMessage
}

def abort(): Unit = {}

def close(): Unit = {
checkForErrors()
if (producer != null) {
producer.flush()
checkForErrors()
CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ private[kafka010] class KafkaOffsetReader(
* Resolves the specific offsets based on Kafka seek positions.
* This method resolves offset value -1 to the latest and -2 to the
* earliest Kafka seek position.
*
* @param partitionOffsets the specific offsets to resolve
* @param reportDataLoss callback to either report or log data loss depending on setting
*/
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] =
runUninterruptibly {
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit): KafkaSourceOffset = {
val fetched = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
Expand All @@ -145,6 +149,19 @@ private[kafka010] class KafkaOffsetReader(
}
}

partitionOffsets.foreach {
case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
off != KafkaOffsetRangeLimit.EARLIEST =>
if (fetched(tp) != off) {
reportDataLoss(
s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
}
case _ =>
// no real way to check that beginning or end is reasonable
}
KafkaSourceOffset(fetched)
}

/**
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
Expand Down
Loading