Skip to content

Commit

Permalink
Merge pull request #1 from tdas/flume-polling
Browse files Browse the repository at this point in the history
Bunch of changes to the new Flume stuff
  • Loading branch information
harishreedharan committed Jul 17, 2014
2 parents 1edc806 + 10b6214 commit d248d22
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import java.net.InetSocketAddress

/**
* Produces a count of events received from Flume.
*
* This should be used in conjunction with the Spark Sink running in a Flume agent. See
* the Spark Streaming programming guide for more details.
*
* Usage: FlumePollingEventCount <host> <port>
* `host` is the host on which the Spark Sink is running.
* `port` is the port at which the Spark Sink is listening.
*
* To run this example:
* `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
*/
object FlumePollingEventCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: FlumePollingEventCount <host> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val Array(host, IntParam(port)) = args

val batchInterval = Milliseconds(2000)

// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)

// Create a flume stream that polls the Spark Sink running in a Flume agent
val stream = FlumeUtils.createPollingStream(ssc, host, port)

// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

ssc.start()
ssc.awaitTermination()
}
}
3 changes: 1 addition & 2 deletions external/flume-sink/src/main/avro/sparkflume.avdl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

@namespace("org.apache.spark.flume")
@namespace("org.apache.spark.streaming.flume.sink")

protocol SparkFlumeProtocol {

Expand All @@ -37,5 +37,4 @@ protocol SparkFlumeProtocol {
void ack (string sequenceNumber);

void nack (string sequenceNumber);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
package org.apache.spark.streaming.flume.sink

import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

trait Logging {
/**
* Copy of the org.apache.spark.Logging for being used in the Spark Sink.
* The org.apache.spark.Logging is not used so that all of Spark is not brought
* in as a dependency.
*/
private[sink] trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null
Expand Down Expand Up @@ -95,20 +101,6 @@ trait Logging {
}

private def initializeLogging() {
// If Log4j is being used, but is not initialized, load a default properties file
val binder = StaticLoggerBinder.getSingleton
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized && usingLog4j) {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(getClass.getClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
Logging.initialized = true

// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
Expand All @@ -117,7 +109,7 @@ trait Logging {
}
}

private object Logging {
private[sink] object Logging {
@volatile private var initialized = false
val initLock = new Object()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
package org.apache.spark.streaming.flume.sink

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.AtomicLong

import com.google.common.util.concurrent.ThreadFactoryBuilder

import org.apache.commons.lang.RandomStringUtils
import org.apache.flume.Channel
import org.apache.spark.flume.{EventBatch, SparkFlumeProtocol}
import org.slf4j.LoggerFactory
import org.apache.commons.lang.RandomStringUtils
import com.google.common.util.concurrent.ThreadFactoryBuilder

/**
* Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
Expand All @@ -34,7 +31,7 @@ import org.slf4j.LoggerFactory
* @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
* is rolled back.
*/
private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -109,7 +106,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* @return The transaction processor for the corresponding batch. Note that this instance is no
* longer tracked and the caller is responsible for that txn processor.
*/
private[flume] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
processorMap.remove(sequenceNumber.toString) // The toString is required!
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
package org.apache.spark.streaming.flume.sink

import java.net.InetSocketAddress
import java.util.concurrent._

import org.apache.avro.ipc.NettyServer
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.flume.Context
import org.apache.flume.Sink.Status
import org.apache.flume.conf.{Configurable, ConfigurationException}
import org.apache.flume.sink.AbstractSink
import org.apache.flume.Context
import org.slf4j.LoggerFactory

import org.apache.spark.flume.SparkFlumeProtocol

/**
* A sink that uses Avro RPC to run a server that can be polled by Spark's
Expand All @@ -48,6 +45,7 @@ import org.apache.spark.flume.SparkFlumeProtocol
// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
// When the response comes, the TransactionProcessor is retrieved and then unblocked,
// at which point the transaction is committed or rolled back.
private[flume]
class SparkSink extends AbstractSink with Logging with Configurable {

// Size of the pool to use for holding transaction processors.
Expand Down Expand Up @@ -130,6 +128,7 @@ class SparkSink extends AbstractSink with Logging with Configurable {
/**
* Configuration parameters and their defaults.
*/
private[flume]
object SparkSinkConfig {
val THREADS = "threads"
val DEFAULT_THREADS = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
package org.apache.spark.streaming.flume.sink

import org.apache.spark.flume.EventBatch

object SparkSinkUtils {
private[flume] object SparkSinkUtils {
/**
* This method determines if this batch represents an error or not.
* @param batch - The batch to check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.flume.sink
package org.apache.spark.streaming.flume.sink

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{TimeUnit, CountDownLatch, Callable}
import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}

import scala.util.control.Breaks

import org.apache.flume.{Transaction, Channel}
import org.apache.spark.flume.{SparkSinkEvent, EventBatch}
import org.slf4j.LoggerFactory


// Flume forces transactions to be thread-local (horrible, I know!)
// So the sink basically spawns a new thread to pull the events out within a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors}

import org.apache.spark.flume.sink.SparkSinkUtils

import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand All @@ -33,45 +31,44 @@ import org.apache.avro.ipc.specific.SpecificRequestor
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory

import org.apache.spark.Logging
import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.flume.sink._


/**
* A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
* [[org.apache.spark.flume.sink.SparkSink]]s.
* @param ssc_ Streaming context that will execute this input stream
* @param _ssc Streaming context that will execute this input stream
* @param addresses List of addresses at which SparkSinks are listening
* @param maxBatchSize Maximum size of a batch
* @param parallelism Number of parallel connections to open
* @param storageLevel The storage level to use.
* @tparam T Class type of the object of this stream
*/
private[streaming]
class FlumePollingInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
val addresses: Seq[InetSocketAddress],
val maxBatchSize: Int,
val parallelism: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) {
/**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
@transient _ssc: StreamingContext,
val addresses: Seq[InetSocketAddress],
val maxBatchSize: Int,
val parallelism: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[SparkFlumePollingEvent](_ssc) {

override def getReceiver(): Receiver[SparkFlumePollingEvent] = {
new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
}
}

private[streaming] class FlumePollingReceiver(
addresses: Seq[InetSocketAddress],
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {
private[streaming]
class FlumePollingReceiver(
addresses: Seq[InetSocketAddress],
maxBatchSize: Int,
parallelism: Int,
storageLevel: StorageLevel
) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging {

lazy val channelFactoryExecutor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
Expand Down Expand Up @@ -150,14 +147,6 @@ private[streaming] class FlumePollingReceiver(
}
}

override def store(dataItem: SparkFlumePollingEvent) {
// Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized
// This takes a performance hit, since the parallelism is useful only for pulling data now.
this.synchronized {
super.store(dataItem)
}
}

override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver")
receiverExecutor.shutdownNow()
Expand All @@ -176,6 +165,9 @@ private[streaming] class FlumePollingReceiver(
private class FlumeConnection(val transceiver: NettyTransceiver,
val client: SparkFlumeProtocol.Callback)

/**
* Companion object of [[SparkFlumePollingEvent]]
*/
private[streaming] object SparkFlumePollingEvent {
def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = {
val event = new SparkFlumePollingEvent()
Expand All @@ -189,7 +181,7 @@ private[streaming] object SparkFlumePollingEvent {
* SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper
* around that to make it externalizable.
*/
class SparkFlumePollingEvent() extends Externalizable with Logging {
class SparkFlumePollingEvent extends Externalizable with Logging {
var event: SparkSinkEvent = new SparkSinkEvent()

/* De-serialize from bytes. */
Expand Down
Loading

0 comments on commit d248d22

Please sign in to comment.