From 8136aa60068e9a526403453c525afb2641acf1df Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 14 Jul 2014 16:12:12 -0700 Subject: [PATCH] Adding TransactionProcessor to map on returning batch of data --- .../org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala index 6a7c591455d5f..1f6e60815dd06 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala @@ -62,6 +62,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, transactionExecutorOpt.map(executor => { executor.submit(processor) }) + processorMap.put(sequenceNumber, processor) // Wait until a batch is available - will be an error if processor.getEventBatch }