Skip to content

Commit

Permalink
[Streaming][Kafka] Take advantage of offset range info for size-relat…
Browse files Browse the repository at this point in the history
…ed KafkaRDD methods. Possible fix for [SPARK-7122], but probably a worthwhile optimization regardless.
  • Loading branch information
koeninger committed Jun 4, 2015
1 parent c6a6dd0 commit c3768c5
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.streaming.kafka

import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.partial.{PartialResult, BoundedDouble}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.NextIterator

Expand Down Expand Up @@ -60,6 +62,49 @@ class KafkaRDD[
}.toArray
}

override def count(): Long = offsetRanges.map(_.count).sum

override def countApprox(
timeout: Long,
confidence: Double = 0.95
): PartialResult[BoundedDouble] = {
val c = count
new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
}

override def isEmpty(): Boolean = count == 0L

override def take(num: Int): Array[R] = {
val nonEmpty = this.partitions
.map(_.asInstanceOf[KafkaRDDPartition])
.filter(_.count > 0)

if (num < 1 || nonEmpty.size < 1) {
return new Array[R](0)
}

var remain = num.toLong
// Determine in advance how many messages need to be taken from each partition
val parts = nonEmpty.flatMap { part =>
if (remain > 0) {
val taken = Math.min(remain, part.count)
remain = remain - taken
Some((part.index -> taken.toInt))
} else {
None
}
}.toMap

val buf = new ArrayBuffer[R]
val res = context.runJob(
this,
(tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
parts.keys.toArray,
allowLocal = true)
res.foreach(buf ++= _)
buf.toArray
}

override def getPreferredLocations(thePart: Partition): Seq[String] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
// TODO is additional hostname resolution necessary here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ class KafkaRDDPartition(
val untilOffset: Long,
val host: String,
val port: Int
) extends Partition
) extends Partition {
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ final class OffsetRange private(
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple

/** Number of messages this OffsetRange refers to */
def count(): Long = untilOffset - fromOffset

override def equals(obj: Any): Boolean = obj match {
case that: OffsetRange =>
this.topic == that.topic &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {

val received = rdd.map(_._2).collect.toSet
assert(received === messages)

// size-related method optimizations return sane results
assert(rdd.count === messages.size)
assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
assert(! rdd.isEmpty)
assert(rdd.take(1).size === 1)
assert(messages(rdd.take(1).head._2))
assert(rdd.take(messages.size + 10).size === messages.size)
}

test("iterator boundary conditions") {
Expand Down

0 comments on commit c3768c5

Please sign in to comment.