Skip to content

Commit

Permalink
SPARK-1729. New Flume-Spark integration.
Browse files Browse the repository at this point in the history
Renamed the SparkPollingEvent to SparkFlumePollingEvent.
  • Loading branch information
harishreedharan committed Jun 10, 2014
1 parent d6fa3aa commit 70bcc2a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ class FlumePollingInputDStream[T: ClassTag](
val maxBatchSize: Int,
val parallelism: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkPollingEvent](ssc_) {
) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) {
/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
override def getReceiver(): Receiver[SparkPollingEvent] = {
override def getReceiver(): Receiver[SparkFlumePollingEvent] = {
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
}
}
Expand All @@ -69,7 +69,7 @@ private[streaming] class FlumePollingReceiver(
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
) extends Receiver[SparkPollingEvent](storageLevel) with Logging {
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {

lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
Expand Down Expand Up @@ -105,7 +105,7 @@ private[streaming] class FlumePollingReceiver(
logDebug("Received batch of " + events.size() + " events with sequence number: " + seq)
try {
// Convert each Flume event to a serializable SparkPollingEvent
events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event)))
events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event)))
// Send an ack to Flume so that Flume discards the events from its channels.
client.ack(seq)
} catch {
Expand All @@ -129,7 +129,7 @@ private[streaming] class FlumePollingReceiver(
}
}

override def store(dataItem: SparkPollingEvent) {
override def store(dataItem: SparkFlumePollingEvent) {
// Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
// This takes a performance hit, since the parallelism is useful only for pulling data now.
this.synchronized {
Expand All @@ -155,9 +155,9 @@ private[streaming] class FlumePollingReceiver(
private class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)

private[streaming] object SparkPollingEvent {
def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = {
val event = new SparkPollingEvent()
private[streaming] object SparkFlumePollingEvent {
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
val event = new SparkFlumePollingEvent()
event.event = in
event
}
Expand All @@ -167,7 +167,7 @@ private[streaming] object SparkPollingEvent {
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
* around that to make it externalizable.
*/
class SparkPollingEvent() extends Externalizable with Logging {
class SparkFlumePollingEvent() extends Externalizable with Logging {
var event : SparkSinkEvent = new SparkSinkEvent()

/* De-serialize from bytes. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ object FlumeUtils {
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkPollingEvent] = {
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
): ReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}

Expand All @@ -111,8 +111,8 @@ object FlumeUtils {
maxBatchSize: Int = 100,
parallelism: Int = 5,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaReceiverInputDStream[SparkPollingEvent] = {
new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize,
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
parallelism, storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
test("flume polling test") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
with SynchronizedBuffer[Seq[SparkPollingEvent]]
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()

Expand All @@ -73,12 +73,12 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
test("flume polling test multiple hosts") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
with SynchronizedBuffer[Seq[SparkPollingEvent]]
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()

Expand Down Expand Up @@ -114,7 +114,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
}

def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) {
outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val executor = Executors.newCachedThreadPool()
val executorCompletion = new ExecutorCompletionService[Void](executor)
Expand Down

0 comments on commit 70bcc2a

Please sign in to comment.