diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl index 9dcc709de079a..fa00b2310a17b 100644 --- a/external/flume-sink/src/main/avro/sparkflume.avdl +++ b/external/flume-sink/src/main/avro/sparkflume.avdl @@ -28,7 +28,7 @@ protocol SparkFlumeProtocol { record EventBatch { string sequenceNumber; - array eventBatch; + array events; } EventBatch getEventBatch (int n); diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index 9e70be74d7e6d..521800be64f2c 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -34,10 +34,8 @@ import org.apache.avro.ipc.NettyServer import org.apache.avro.ipc.specific.SpecificResponder import java.net.InetSocketAddress -class SparkSink() extends AbstractSink with Configurable { +class SparkSink extends AbstractSink with Configurable { private val LOG = LoggerFactory.getLogger(this.getClass) - private val lock = new ReentrantLock() - private val blockingCondition = lock.newCondition() // This sink will not persist sequence numbers and reuses them if it gets restarted. // So it is possible to commit a transaction which may have been meant for the sink before the @@ -58,19 +56,20 @@ class SparkSink() extends AbstractSink with Configurable { private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]() - private var processorFactory: Option[SparkHandlerFactory] = None + private var processorManager: Option[TransactionProcessorManager] = None private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME private var port: Int = 0 private var maxThreads: Int = SparkSinkConfig.DEFAULT_MAX_THREADS private var serverOpt: Option[NettyServer] = None - private var running = false + + private val blockingLatch = new CountDownLatch(1) override def start() { transactionExecutorOpt = Option(Executors.newFixedThreadPool(numProcessors, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Spark Sink, " + getName + " Processor Thread - %d").build())) - processorFactory = Option(new SparkHandlerFactory(numProcessors)) + processorManager = Option(new TransactionProcessorManager(numProcessors)) val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler()) @@ -80,12 +79,6 @@ class SparkSink() extends AbstractSink with Configurable { serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) serverOpt.map(server => server.start()) - lock.lock() - try { - running = true - } finally { - lock.unlock() - } super.start() } @@ -95,24 +88,15 @@ class SparkSink() extends AbstractSink with Configurable { server.close() server.join() }) - lock.lock() - try { - running = false - blockingCondition.signalAll() - } finally { - lock.unlock() - } + blockingLatch.countDown() + super.stop() } override def configure(ctx: Context) { import SparkSinkConfig._ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) - val portOpt = Option(ctx.getInteger(CONF_PORT)) - if(portOpt.isDefined) { - port = portOpt.get - } else { - throw new ConfigurationException("The Port to bind must be specified") - } + port = Option(ctx.getInteger(CONF_PORT)). + getOrElse(throw new ConfigurationException("The port to bind to must be specified")) numProcessors = ctx.getInteger(PROCESSOR_COUNT, DEFAULT_PROCESSOR_COUNT) transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) maxThreads = ctx.getInteger(CONF_MAX_THREADS, DEFAULT_MAX_THREADS) @@ -120,15 +104,9 @@ class SparkSink() extends AbstractSink with Configurable { override def process(): Status = { // This method is called in a loop by the Flume framework - block it until the sink is - // stopped to save CPU resources - lock.lock() - try { - while(running) { - blockingCondition.await() - } - } finally { - lock.unlock() - } + // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is + // being shut down. + blockingLatch.await() Status.BACKOFF } @@ -136,24 +114,22 @@ class SparkSink() extends AbstractSink with Configurable { // Object representing an empty batch returned by the txn processor due to some error. case object ErrorEventBatch extends EventBatch - private class AvroCallbackHandler() extends SparkFlumeProtocol { + private class AvroCallbackHandler extends SparkFlumeProtocol { override def getEventBatch(n: Int): EventBatch = { - val processor = processorFactory.get.checkOut(n) + val processor = processorManager.get.checkOut(n) transactionExecutorOpt.map(executor => executor.submit(processor)) // Wait until a batch is available - can be null if some error was thrown - val eventBatch = processor.eventQueue.take() - eventBatch match { + processor.eventQueue.take() match { case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" + " retrieved from channel.") - case events => { - processorMap.put(events.getSequenceNumber, processor) + case eventBatch: EventBatch => + processorMap.put(eventBatch.getSequenceNumber, processor) if (LOG.isDebugEnabled) { - LOG.debug("Sent " + events.getEventBatch.size() + - " events with sequence number: " + events.getSequenceNumber) + LOG.debug("Sent " + eventBatch.getEvents.size() + + " events with sequence number: " + eventBatch.getSequenceNumber) } - events - } + eventBatch } } @@ -214,41 +190,23 @@ class SparkSink() extends AbstractSink with Configurable { tx.begin() try { eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet()) - val events = eventBatch.getEventBatch + val events = eventBatch.getEvents events.clear() val loop = new Breaks var gotEventsInThisTxn = false loop.breakable { - var i = 0 - // Using for here causes the maxBatchSize change to be ineffective as the Range gets - // pregenerated - while (i < maxBatchSize) { - i += 1 - val eventOpt = Option(getChannel.take()) - eventOpt.map(event => { - events.add(new SparkSinkEvent(toCharSequenceMap(event - .getHeaders), - ByteBuffer.wrap(event.getBody))) - gotEventsInThisTxn = true - }) - if (eventOpt.isEmpty) { - if (!gotEventsInThisTxn) { - // To avoid sending empty batches, we wait till events are available backing off - // between attempts to get events. Each attempt to get an event though causes one - // iteration to be lost. To ensure that we still send back maxBatchSize number of - // events, we cheat and increase the maxBatchSize by 1 to account for the lost - // iteration. Even throwing an exception is expensive as Avro will serialize it - // and send it over the wire, which is useless. Before incrementing though, - // ensure that we are not anywhere near INT_MAX. - if (maxBatchSize >= Int.MaxValue / 2) { - // Random sanity check - throw new RuntimeException("Safety exception - polled too many times, no events!") + while (events.size() < maxBatchSize) { + Option(getChannel.take()) match { + case Some(event) => + events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders), + ByteBuffer.wrap(event.getBody))) + gotEventsInThisTxn = true + case None => + if (!gotEventsInThisTxn) { + Thread.sleep(500) + } else { + loop.break() } - maxBatchSize += 1 - Thread.sleep(500) - } else { - loop.break() - } } } } @@ -284,7 +242,7 @@ class SparkSink() extends AbstractSink with Configurable { } finally { resultQueueUpdateLock.unlock() } - eventBatch.getEventBatch.clear() + eventBatch.getEvents.clear() // If the batch failed on spark side, throw a FlumeException maybeResult.map(success => if (!success) { @@ -315,7 +273,7 @@ class SparkSink() extends AbstractSink with Configurable { // remove the event from the map and then clear the value resultQueue.clear() processorMap.remove(eventBatch.getSequenceNumber) - processorFactory.get.checkIn(this) + processorManager.get.checkIn(this) tx.close() } } @@ -328,7 +286,7 @@ class SparkSink() extends AbstractSink with Configurable { } } - private class SparkHandlerFactory(val maxInstances: Int) { + private class TransactionProcessorManager(val maxInstances: Int) { val queue = new scala.collection.mutable.Queue[TransactionProcessor] val queueModificationLock = new ReentrantLock() var currentSize = 0 diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 495f569f6359d..ee337b5f5507f 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -77,13 +77,13 @@ private[streaming] class FlumePollingReceiver( private var connections = Array.empty[FlumeConnection] // temporarily empty, filled in later override def onStart(): Unit = { - val connectionBuilder = new mutable.ArrayBuilder.ofRef[FlumeConnection]() - addresses.map(host => { + // Create the connections to each Flume agent. + connections = addresses.map(host => { val transceiver = new NettyTransceiver(host, channelFactory) val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) - connectionBuilder += new FlumeConnection(transceiver, client) - }) - connections = connectionBuilder.result() + new FlumeConnection(transceiver, client) + }).toArray + val dataReceiver = new Runnable { override def run(): Unit = { var counter = 0 @@ -93,14 +93,18 @@ private[streaming] class FlumePollingReceiver( counter += 1 val batch = client.getEventBatch(maxBatchSize) val seq = batch.getSequenceNumber - val events: java.util.List[SparkSinkEvent] = batch.getEventBatch + val events: java.util.List[SparkSinkEvent] = batch.getEvents logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) try { events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event))) client.ack(seq) } catch { case e: Throwable => - client.nack(seq) + try { + client.nack(seq) // If the agent is down, even this could fail and throw + } catch { + case e: Throwable => logError("Sending Nack also failed. A Flume agent is down.") + } TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds. logWarning("Error while attempting to store events", e) }