Skip to content

Commit

Permalink
MapR [SPARK-135] Spark 2.2 with MapR Streams ( Kafka 1.0) (apache#218)
Browse files Browse the repository at this point in the history
* MapR [SPARK-135] Spark 2.2 with MapR Streams (Kafka 1.0)
Added functionality of MapR-Streams specific EOF handling.
  • Loading branch information
ekrivokonmapr committed Sep 19, 2019
1 parent 23e57c1 commit caeafa0
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 280 deletions.
45 changes: 15 additions & 30 deletions external/kafka-0-9/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<artifactId>spark-streaming-kafka-0-9_2.11</artifactId>
<properties>
<sbt.project.name>streaming-kafka-0-9</sbt.project.name>
<scala.kafka101.version>2.12</scala.kafka101.version>
</properties>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.9</name>
Expand All @@ -50,36 +51,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0-mapr-1703</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.9.0.0</version>
<scope>provided</scope>
<artifactId>kafka_${scala.kafka101.version}</artifactId>
<version>1.0.1-mapr-1801</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down Expand Up @@ -118,6 +91,18 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
*/

package org.apache.spark.streaming.kafka09

import java.{ util => ju }

import collection.JavaConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
import org.apache.kafka.common.{ KafkaException, TopicPartition }

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging


/**
* Consumer of single topicpartition, intended for cached reuse.
* Underlying consumer is not threadsafe, so neither is this,
Expand Down Expand Up @@ -78,7 +80,11 @@ class CachedKafkaConsumer[K, V] private(

nextOffset = offset + 1

if (record.offset() == 0 && isStreams && buffer.hasNext) buffer.next() else record
if (record.offset() == KafkaUtils.eofOffset && isStreams && buffer.hasNext) {
buffer.next()
} else {
record
}
// Offsets in MapR-streams can contains gaps
/* if (record.offset < offset) {
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
Expand All @@ -104,10 +110,10 @@ class CachedKafkaConsumer[K, V] private(
private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)

logDebug(s"Polled ${p.partitions()} ${r.size}")
buffer = r.iterator()
buffer = r.iterator
}

}

private[kafka09]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.streaming.kafka09

import java.{ lang => jl, util => ju }
import java.{lang => jl, util => ju}
import java.util.Locale

import scala.collection.JavaConverters._

Expand All @@ -32,7 +33,7 @@ import org.apache.spark.internal.Logging
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
* See [[ConsumerStrategies]] to obtain instances.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* Kafka 0.9 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
Expand Down Expand Up @@ -72,7 +73,7 @@ abstract class ConsumerStrategy[K, V] {
* auto.offset.reset will be used.
*/
private case class Subscribe[K, V](
topics: ju.List[jl.String],
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] with Logging {
Expand All @@ -93,10 +94,10 @@ private case class Subscribe[K, V](
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer)
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand All @@ -105,10 +106,8 @@ private case class Subscribe[K, V](
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}

// we've called poll, we must pause or next poll may consume messages and set position
val topicPartitions = consumer.assignment().asScala.toArray
consumer.pause(topicPartitions: _*)
consumer.pause(consumer.assignment())
}

consumer
Expand Down Expand Up @@ -148,10 +147,10 @@ private case class SubscribePattern[K, V](
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
val shouldSuppress =
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
try {
// consumer.poll(0)
KafkaUtils.waitForConsumerAssignment(consumer)
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
Expand All @@ -161,8 +160,7 @@ private case class SubscribePattern[K, V](
consumer.seek(topicPartition, offset)
}
// we've called poll, we must pause or next poll may consume messages and set position
val topicPartitions = consumer.assignment().asScala.toArray
consumer.pause(topicPartitions: _*)
consumer.pause(consumer.assignment())
}

consumer
Expand All @@ -183,7 +181,7 @@ private case class SubscribePattern[K, V](
* auto.offset.reset will be used.
*/
private case class Assign[K, V](
topicPartitions: ju.List[TopicPartition],
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {
Expand Down Expand Up @@ -277,7 +275,7 @@ object ConsumerStrategies {
*/
@Experimental
def Subscribe[K, V](
topics: ju.List[jl.String],
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, offsets)
Expand All @@ -296,7 +294,7 @@ object ConsumerStrategies {
*/
@Experimental
def Subscribe[K, V](
topics: ju.List[jl.String],
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
Expand Down Expand Up @@ -452,7 +450,7 @@ object ConsumerStrategies {
*/
@Experimental
def Assign[K, V](
topicPartitions: ju.List[TopicPartition],
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new Assign[K, V](topicPartitions, kafkaParams, offsets)
Expand All @@ -471,7 +469,7 @@ object ConsumerStrategies {
*/
@Experimental
def Assign[K, V](
topicPartitions: ju.List[TopicPartition],
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new Assign[K, V](
topicPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]],
* see [[LocationStrategy]] for more details.
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>.
* Requires "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* @param consumerStrategy In most cases, pass in [[ConsumerStrategies.Subscribe]],
* see [[ConsumerStrategy]] for more details
* @param ppc configuration of settings such as max rate on a per-partition basis.
* see [[PerPartitionConfig]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
Expand Down Expand Up @@ -110,7 +108,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
}

// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka 0.09 direct stream [$id]"
private[streaming] override def name: String = s"Kafka 0.9 direct stream [$id]"

protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
Expand Down Expand Up @@ -159,17 +157,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
}
}

private def adjustPosition(tp: TopicPartition) = {
val c = consumer
val pos = c.position(tp)
if (pos == 0) {
val isStreams = tp.topic().startsWith("/") || tp.topic().contains(":")
if (isStreams) 1L else 0L
} else {
pos
}
}

/**
* The concern here is that poll might consume messages despite being paused,
* which would throw off consumer position. Fix position if this happens.
Expand Down Expand Up @@ -200,12 +187,11 @@ private[spark] class DirectKafkaInputDStream[K, V](
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp =>
tp -> adjustPosition(tp)).toMap
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// don't want to consume messages, so pause
c.pause(newPartitions.toArray : _*)
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.toArray : _*)
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}

Expand All @@ -227,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val rdd = new KafkaRDD[K, V](
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
true)
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)

// Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRanges.filter { offsetRange =>
Expand All @@ -255,12 +243,12 @@ private[spark] class DirectKafkaInputDStream[K, V](
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> adjustPosition(tp)
tp -> c.position(tp)
}.toMap
}

// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.toArray: _*)
c.pause(currentOffsets.keySet.asJava)
}

override def stop(): Unit = this.synchronized {
Expand Down Expand Up @@ -330,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
b.map(OffsetRange(_)),
getPreferredHosts,
// during restore, it's possible same partition will be consumed from multiple
// threads, so dont use cache
// threads, so do not use cache.
false
)
}
Expand Down
Loading

0 comments on commit caeafa0

Please sign in to comment.