Skip to content

Commit

Permalink
[Streaming][Kafka][SPARK-8127] code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Jun 19, 2015
1 parent 9555b73 commit f68bd32
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ class DirectKafkaInputDStream[
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

// Report the record number of this batch interval to InputInfoTracker.
val numRecords = rdd.offsetRanges.map(r => r.untilOffset - r.fromOffset).sum
val inputInfo = InputInfo(id, numRecords)
val inputInfo = InputInfo(id, rdd.count)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Expand Down Expand Up @@ -153,10 +152,7 @@ class DirectKafkaInputDStream[
override def restore() {
// this is assuming that the topics don't change during execution, which is true currently
val topics = fromOffsets.keySet
val leaders = kc.findLeaders(topics).fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))

batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,17 @@ object KafkaUtils {
kc: KafkaCluster,
offsetRanges: Array[OffsetRange]): Unit = {
val topics = offsetRanges.map(_.topicAndPartition).toSet
val badRanges = KafkaCluster.checkErrors(for {
val result = for {
low <- kc.getEarliestLeaderOffsets(topics).right
high <- kc.getLatestLeaderOffsets(topics).right
} yield {
offsetRanges.filterNot { o =>
low(o.topicAndPartition).offset <= o.fromOffset &&
o.untilOffset <= high(o.topicAndPartition).offset
}
})
if (! badRanges.isEmpty) {
}
val badRanges = KafkaCluster.checkErrors(result)
if (!badRanges.isEmpty) {
throw new SparkException("Offsets not available on leader: " + badRanges.mkString(","))
}
}
Expand Down Expand Up @@ -418,7 +419,7 @@ object KafkaUtils {
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

KafkaCluster.checkErrors(for {
val result = for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
Expand All @@ -431,7 +432,8 @@ object KafkaUtils {
}
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
})
}
KafkaCluster.checkErrors(result)
}

/**
Expand Down

0 comments on commit f68bd32

Please sign in to comment.