From 611e92068efad6df1c4c2893c79f61c6d9073bb7 Mon Sep 17 00:00:00 2001 From: Rostyslav Sotnychenko Date: Wed, 1 Mar 2017 18:01:09 +0200 Subject: [PATCH] [MAPR-26289][SPARK-2.1] Streaming general improvements (#93) * Added include-kafka-09 profile to Assembly * Set default poll timeout to 120s --- assembly/pom.xml | 10 ++++++++++ .../org/apache/spark/streaming/kafka09/KafkaRDD.scala | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index edb04404fd42d..0b2ea633b426d 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -168,6 +168,16 @@ + + include-kafka-09 + + + org.apache.spark + spark-streaming-kafka-0-9_${scala.binary.version} + ${project.version} + + + spark-ganglia-lgpl diff --git a/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala b/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala index 02cc506d50953..29cef9662b030 100644 --- a/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala +++ b/external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala @@ -65,7 +65,8 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512) + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", + conf.getTimeAsMs("spark.network.timeout", "120s")) private val cacheInitialCapacity = conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) private val cacheMaxCapacity =