Skip to content

Commit

Permalink
[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming K…
Browse files Browse the repository at this point in the history
…afka connector

Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062).

Author: jerryshao <[email protected]>
Author: Tathagata Das <[email protected]>
Author: Saisai Shao <[email protected]>

Closes apache#2991 from jerryshao/kafka-refactor and squashes the following commits:

5461f1c [Saisai Shao] Merge pull request alteryx#8 from tdas/kafka-refactor3
eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust.
fab14c7 [Tathagata Das] minor update.
149948b [Tathagata Das] Fixed mistake
14630aa [Tathagata Das] Minor updates.
d9a452c [Tathagata Das] Minor updates.
ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design.
2a20a01 [jerryshao] Address some comments
9f636b3 [Saisai Shao] Merge pull request alteryx#5 from tdas/kafka-refactor
b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites
e501b3c [jerryshao] Add Mima excludes
b798535 [jerryshao] Fix the missed issue
e5e21c1 [jerryshao] Change to while loop
ea873e4 [jerryshao] Further address the comments
98f3d07 [jerryshao] Fix comment style
4854ee9 [jerryshao] Address all the comments
96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test
8135d31 [jerryshao] Fix flaky test
a949741 [jerryshao] Address the comments
16bfe78 [jerryshao] Change the ordering of imports
0894aef [jerryshao] Add some comments
77c3e50 [jerryshao] Code refactor and add some unit tests
dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver

(cherry picked from commit 5930f64)
Signed-off-by: Tathagata Das <[email protected]>
  • Loading branch information
jerryshao authored and tdas committed Nov 14, 2014
1 parent f8810b6 commit 5b63158
Show file tree
Hide file tree
Showing 10 changed files with 651 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.streaming.kafka

import java.util.Properties

import scala.collection.Map
import scala.reflect.{classTag, ClassTag}

import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer._
import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

Expand All @@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

/**
* Input stream that pulls messages from a Kafka Broker.
Expand All @@ -51,12 +51,16 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
}

Expand All @@ -69,14 +73,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[Any](storageLevel) with Logging {
) extends Receiver[(K, V)](storageLevel) with Logging {

// Connection to Kafka
var consumerConnector : ConsumerConnector = null
var consumerConnector: ConsumerConnector = null

def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}

Expand All @@ -102,11 +107,11 @@ class KafkaReceiver[
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

// Create Threads for each Topic/Message Stream we are listening
// Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Executors.newFixedThreadPool(topics.values.sum)
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
Expand All @@ -117,13 +122,15 @@ class KafkaReceiver[
}
}

// Handles Kafka Messages
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
for (msgAndMetadata <- stream) {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}

/**
Expand Down Expand Up @@ -99,7 +100,6 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*
*/
def createStream(
jssc: JavaStreamingContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import java.util.Properties
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}

import scala.collection.{Map, mutable}
import scala.reflect.{ClassTag, classTag}

import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils

/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
* It is turned off by default and will be enabled when
* spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
* is that this receiver manages topic-partition/offset itself and updates the offset information
* after data is reliably stored as write-ahead log. Offsets will only be updated when data is
* reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
*
* Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
* commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
* will not take effect.
*/
private[streaming]
class ReliableKafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel)
extends Receiver[(K, V)](storageLevel) with Logging {

private val groupId = kafkaParams("group.id")
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
private def conf = SparkEnv.get.conf

/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null

/** zkClient to connect to Zookeeper to commit the offsets. */
private var zkClient: ZkClient = null

/**
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
* synchronized block, so mutable HashMap will not meet concurrency issue.
*/
private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null

/** A concurrent HashMap to store the stream block id and related offset snapshot. */
private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null

/**
* Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit.
*/
private var blockGenerator: BlockGenerator = null

/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null

override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")

// Initialize the topic-partition / offset hash map.
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]

// Initialize the stream block id / offset snapshot hash map.
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")
}

val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
// Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
// we have to make sure this property is set to false to turn off auto commit mechanism in
// Kafka.
props.setProperty(AUTO_OFFSET_COMMIT, "false")

val consumerConfig = new ConsumerConfig(props)

assert(!consumerConfig.autoCommitEnable)

logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
consumerConnector = Consumer.create(consumerConfig)
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")

zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")

blockGenerator.start()

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
messageHandlerThreadPool.submit(new MessageHandler(stream))
}
}
}

override def onStop(): Unit = {
if (messageHandlerThreadPool != null) {
messageHandlerThreadPool.shutdown()
messageHandlerThreadPool = null
}

if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)
blockGenerator.addDataWithCallback(data, metadata)
}

/** Update stored offset */
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
topicPartitionOffsetMap.put(topicAndPartition, offset)
}

/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

/** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
}

/**
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
* metadata schema in Zookeeper.
*/
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
if (zkClient == null) {
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
return
}

for ((topicAndPart, offset) <- offsetMap) {
try {
val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
} catch {
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
}

logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
s"partition ${topicAndPart.partition}")
}
}

/** Class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
while (!isStopped) {
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext) {
storeMessageAndMetadata(streamIterator.next)
}
} catch {
case e: Exception =>
logError("Error handling message", e)
}
}
}
}

/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

def onAddData(data: Any, metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}

def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
}
Loading

0 comments on commit 5b63158

Please sign in to comment.