Skip to content

Commit

Permalink
SPARK-1729. Make Flume pull data from source, rather than the current…
Browse files Browse the repository at this point in the history
… push model

Update to the previous patch fixing some error cases and also excluding Netty dependencies. Also updated the unit tests.
  • Loading branch information
harishreedharan committed May 18, 2014
1 parent 6d6776a commit d24d9d4
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import java.util.concurrent._
import java.util
import org.apache.flume.conf.{ConfigurationException, Configurable}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.avro.ipc.{NettyTransceiver, NettyServer}
import org.apache.avro.ipc.NettyServer
import org.apache.avro.ipc.specific.SpecificResponder
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import java.net.InetSocketAddress

class SparkSink() extends AbstractSink with Configurable {
Expand Down Expand Up @@ -75,12 +74,10 @@ class SparkSink() extends AbstractSink with Configurable {

val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler())

serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port),
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
"Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build),
Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat(
"Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build))))
// Using the constructor that takes specific thread-pools requires bringing in netty
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))

serverOpt.map(server => server.start())
lock.lock()
Expand All @@ -93,10 +90,14 @@ class SparkSink() extends AbstractSink with Configurable {
}

override def stop() {
transactionExecutorOpt.map(executor => executor.shutdownNow())
serverOpt.map(server => {
server.close()
server.join()
})
lock.lock()
try {
running = false
transactionExecutorOpt.map(executor => executor.shutdownNow())
blockingCondition.signalAll()
} finally {
lock.unlock()
Expand Down Expand Up @@ -131,23 +132,28 @@ class SparkSink() extends AbstractSink with Configurable {
Status.BACKOFF
}


// Object representing an empty batch returned by the txn processor due to some error.
case object ErrorEventBatch extends EventBatch

private class AvroCallbackHandler() extends SparkFlumeProtocol {

override def getEventBatch(n: Int): EventBatch = {
val processor = processorFactory.get.checkOut(n)
transactionExecutorOpt.map(executor => executor.submit(processor))
// Wait until a batch is available - can be null if some error was thrown
val eventBatch = Option(processor.eventQueue.take())
if (eventBatch.isDefined) {
val eventsToBeSent = eventBatch.get
processorMap.put(eventsToBeSent.getSequenceNumber, processor)
if (LOG.isDebugEnabled) {
LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() +
" events with sequence number: " + eventsToBeSent.getSequenceNumber)
val eventBatch = processor.eventQueue.take()
eventBatch match {
case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" +
" retrieved from channel.")
case events => {
processorMap.put(events.getSequenceNumber, processor)
if (LOG.isDebugEnabled) {
LOG.debug("Sent " + events.getEventBatch.size() +
" events with sequence number: " + events.getSequenceNumber)
}
events
}
eventsToBeSent
} else {
throw new FlumeException("Error while trying to retrieve events from the channel.")
}
}

Expand Down Expand Up @@ -211,17 +217,38 @@ class SparkSink() extends AbstractSink with Configurable {
val events = eventBatch.getEventBatch
events.clear()
val loop = new Breaks
var gotEventsInThisTxn = false
loop.breakable {
for (i <- 0 until maxBatchSize) {
var i = 0
// Using for here causes the maxBatchSize change to be ineffective as the Range gets
// pregenerated
while (i < maxBatchSize) {
i += 1
val eventOpt = Option(getChannel.take())

eventOpt.map(event => {
events.add(new SparkSinkEvent(toCharSequenceMap(event
.getHeaders),
ByteBuffer.wrap(event.getBody)))
gotEventsInThisTxn = true
})
if (eventOpt.isEmpty) {
loop.break()
if (!gotEventsInThisTxn) {
// To avoid sending empty batches, we wait till events are available backing off
// between attempts to get events. Each attempt to get an event though causes one
// iteration to be lost. To ensure that we still send back maxBatchSize number of
// events, we cheat and increase the maxBatchSize by 1 to account for the lost
// iteration. Even throwing an exception is expensive as Avro will serialize it
// and send it over the wire, which is useless. Before incrementing though,
// ensure that we are not anywhere near INT_MAX.
if (maxBatchSize >= Int.MaxValue / 2) {
// Random sanity check
throw new RuntimeException("Safety exception - polled too many times, no events!")
}
maxBatchSize += 1
Thread.sleep(500)
} else {
loop.break()
}
}
}
}
Expand Down Expand Up @@ -283,7 +310,7 @@ class SparkSink() extends AbstractSink with Configurable {
null // No point rethrowing the exception
} finally {
// Must *always* release the caller thread
eventQueue.put(null)
eventQueue.put(ErrorEventBatch)
// In the case of success coming after the timeout, but before resetting the seq number
// remove the event from the map and then clear the value
resultQueue.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import org.apache.spark.streaming.util.ManualClock
import java.nio.charset.Charset
import org.apache.flume.channel.MemoryChannel
import org.apache.flume.Context
import org.apache.flume.conf.Configurables
Expand All @@ -39,7 +38,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 5,
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
with SynchronizedBuffer[Seq[SparkPollingEvent]]
Expand All @@ -63,15 +62,17 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
ssc.start()

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
var t = 0
for (i <- 0 until 5) {
val tx = channel.getTransaction
tx.begin()
for (j <- 0 until input.size) {
for (j <- 0 until 5) {
channel.put(EventBuilder.withBody(
(String.valueOf(i) + input(j)).getBytes("utf-8"),
Map[String, String]("test-" + input(j).toString -> "header")))
String.valueOf(t).getBytes("utf-8"),
Map[String, String]("test-" + t.toString -> "header")))
t += 1
}

tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
Expand All @@ -86,19 +87,30 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()
sink.stop()
channel.stop()

val decoder = Charset.forName("UTF-8").newDecoder()

assert(outputBuffer.size === 5)
val flattenedBuffer = outputBuffer.flatten
assert(flattenedBuffer.size === 25)
var counter = 0
for (i <- 0 until outputBuffer.size;
j <- 0 until outputBuffer(i).size) {
counter += 1
val eventToVerify = outputBuffer(i)(j).event
val str = decoder.decode(eventToVerify.getBody)
assert(str.toString === (String.valueOf(i) + input(j)))
assert(eventToVerify.getHeaders.get("test-" + input(j).toString) === "header")
for (i <- 0 until 25) {
val eventToVerify = EventBuilder.withBody(
String.valueOf(i).getBytes("utf-8"),
Map[String, String]("test-" + i.toString -> "header"))
var found = false
var j = 0
while (j < flattenedBuffer.size && !found) {
val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
eventToVerify.getHeaders.get("test-" + i.toString)
.equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
found = true
counter += 1
}
j += 1
}
}
assert (counter === 25)
}

}
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import sbt.Keys._
import sbt.Task
import sbtassembly.Plugin._
import AssemblyKeys._
import sbtavro.SbtAvro._
import scala.Some
import scala.util.Properties
import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
Expand Down

0 comments on commit d24d9d4

Please sign in to comment.