Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame.
-This conversion can be done using one of two methods in a SQLContext :
+This conversion can be done using one of two methods in a `SQLContext` :
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
@@ -1056,7 +1179,7 @@ DataFrame anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame.
-This conversion can be done using one of two methods in a SQLContext:
+This conversion can be done using one of two methods in a `SQLContext`:
* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object.
* `jsonRDD` - loads data from an existing RDD where each element of the RDD is a string containing a JSON object.
@@ -1085,7 +1208,7 @@ people.printSchema()
# Register this DataFrame as a table.
people.registerTempTable("people")
-# SQL statements can be run by using the sql methods provided by sqlContext.
+# SQL statements can be run by using the sql methods provided by `sqlContext`.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# Alternatively, a DataFrame can be created for a JSON dataset represented by
@@ -1131,7 +1254,7 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
adds support for finding tables in the MetaStore and writing queries using HiveQL. Users who do
-not have an existing Hive deployment can still create a HiveContext. When not configured by the
+not have an existing Hive deployment can still create a `HiveContext`. When not configured by the
hive-site.xml, the context automatically creates `metastore_db` and `warehouse` in the current
directory.
@@ -1318,7 +1441,7 @@ Spark SQL can cache tables using an in-memory columnar format by calling `sqlCon
Then Spark SQL will scan only required columns and will automatically tune compression to minimize
memory usage and GC pressure. You can call `sqlContext.uncacheTable("tableName")` to remove the table from memory.
-Configuration of in-memory caching can be done using the `setConf` method on SQLContext or by running
+Configuration of in-memory caching can be done using the `setConf` method on `SQLContext` or by running
`SET key=value` commands using SQL.
@@ -1429,10 +1552,10 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
You may also use the beeline script that comes with Hive.
-Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
-Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
+Thrift JDBC server also supports sending thrift RPC messages over HTTP transport.
+Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`:
- hive.server2.transport.mode - Set this to value: http
+ hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
@@ -1506,7 +1629,7 @@ When using function inside of the DSL (now replaced with the `DataFrame` API) us
Spark 1.3 removes the type aliases that were present in the base sql package for `DataType`. Users
should instead import the classes in `org.apache.spark.sql.types`
-#### UDF Registration Moved to sqlContext.udf (Java & Scala)
+#### UDF Registration Moved to `sqlContext.udf` (Java & Scala)
Functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been
moved into the udf object in `SQLContext`.
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 40e17246fea83..c8ab146bcae0a 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
+Python API Flume is not yet available in the Python API.
+
## Approach 1: Flume-style Push-based Approach
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index 77c0abbbacbd0..64714f0b799fc 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -2,58 +2,155 @@
layout: global
title: Spark Streaming + Kafka Integration Guide
---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka.
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka's high-level API, and a new experimental approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details.
-1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+## Approach 1: Receiver-based Approach
+This approach uses a Receiver to receive the data. The Received is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
+
+However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
groupId = org.apache.spark
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}
-2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows.
+ For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows.
import org.apache.spark.streaming.kafka._
- val kafkaStream = KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])
+ val kafkaStream = KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
- See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
import org.apache.spark.streaming.kafka.*;
- JavaPairReceiverInputDStream kafkaStream = KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);
+ JavaPairReceiverInputDStream kafkaStream =
+ KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
- See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+
+
+
+ from pyspark.streaming.kafka import KafkaUtils
+
+ kafkaStream = KafkaUtils.createStream(streamingContext, \
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
+
+ By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
- *Points to remember:*
+ **Points to remember:**
- Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.
- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
-3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
-
-Note that the Kafka receiver used by default is an
-[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) section in the
-programming guide). In Spark 1.2, we have added an experimental *reliable* Kafka receiver that
-provides stronger
-[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
-data loss on failures. This receiver is automatically used when the write ahead log
-(also introduced in Spark 1.2) is enabled
-(see [Deployment](#deploying-applications.html) section in the programming guide). This
-may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
-receivers, but this can be corrected by running
-[more receivers in parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
-to increase aggregate throughput. Additionally, it is recommended that the replication of the
-received data within Spark be disabled when the write ahead log is enabled as the log is already stored
-in a replicated storage system. This can be done by setting the storage level for the input
-stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+ - If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
+
+3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.
+
+ For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+ For Python applications which lack SBT/Maven project management, `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is,
+
+ ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
+
+ Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kafka-assembly` from the
+ [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.
+
+## Approach 2: Direct Approach (No Receivers)
+This is a new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature in Spark 1.3 and is only available in the Scala and Java API.
+
+This approach has the following advantages over the received-based approach (i.e. Approach 1).
+
+- *Simplified Parallelism:* No need to create multiple input Kafka streams and union-ing them. With `directStream`, Spark Streaming will create as many RDD partitions as there is Kafka partitions to consume, which will all read data from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
+
+- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.
+
+- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
+
+Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows.
+
+
+
+ import org.apache.spark.streaming.kafka._
+
+ val directKafkaStream = KafkaUtils.createDirectStream[
+ [key class], [value class], [key decoder class], [value decoder class] ](
+ streamingContext, [map of Kafka parameters], [set of topics to consume])
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+
+
+ import org.apache.spark.streaming.kafka.*;
+
+ JavaPairReceiverInputDStream directKafkaStream =
+ KafkaUtils.createDirectStream(streamingContext,
+ [key class], [value class], [key decoder class], [value decoder class],
+ [map of Kafka parameters], [set of topics to consume]);
+
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+
+
+
+
+ In the Kafka parameters, you must specify either `metadata.broker.list` or `bootstrap.servers`.
+ By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration `auto.offset.reset` in Kafka parameters to `smallest`, then it will start consuming from the smallest offset.
+
+ You can also start consuming from any arbitrary offset using other variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.
+
+
+
+ directKafkaStream.foreachRDD { rdd =>
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
+ // offsetRanges.length = # of Kafka partitions being consumed
+ ...
+ }
+
+
+ directKafkaStream.foreachRDD(
+ new Function, Void>() {
+ @Override
+ public Void call(JavaPairRDD rdd) throws IOException {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges
+ // offsetRanges.length = # of Kafka partitions being consumed
+ ...
+ return null;
+ }
+ }
+ );
+
+
+
+ You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
+
+ Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.
+
+3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
\ No newline at end of file
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 815c98713b738..6d6229625f3f9 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -189,15 +189,15 @@ Next, we want to count these words.
{% highlight java %}
// Count each word in each batch
-JavaPairDStream pairs = words.map(
+JavaPairDStream pairs = words.mapToPair(
new PairFunction() {
- @Override public Tuple2 call(String s) throws Exception {
+ @Override public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
});
JavaPairDStream wordCounts = pairs.reduceByKey(
new Function2() {
- @Override public Integer call(Integer i1, Integer i2) throws Exception {
+ @Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
@@ -432,7 +432,7 @@ some of the common ones are as follows.
For an up-to-date list, please refer to the
-[Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
+[Maven repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
for the full list of supported sources and artifacts.
***
@@ -662,8 +662,7 @@ methods for creating DStreams from files and Akka actors as input sources.
For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
-
Python API As of Spark 1.2,
- `fileStream` is not available in the Python API, only `textFileStream` is available.
+
Python API `fileStream` is not available in the Python API, only `textFileStream` is available.
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
@@ -682,8 +681,9 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
### Advanced Sources
{:.no_toc}
-
Python API As of Spark 1.2,
-these sources are not available in the Python API.
+
+
Python API As of Spark 1.3,
+out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
@@ -723,6 +723,12 @@ and it in the classpath.
Some of these advanced sources are as follows.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.1.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
+
+- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
+
+- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
+
- **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using
[Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
@@ -732,17 +738,10 @@ Some of these advanced sources are as follows.
([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)
and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
-- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
-
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can receive data from Kafka 0.8.0. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
-
-- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
-
### Custom Sources
{:.no_toc}
-
Python API As of Spark 1.2,
-these sources are not available in the Python API.
+
Python API This is not yet supported in Python.
Input DStreams can also be created out of custom data sources. All you have to do is implement an
user-defined **receiver** (see next section to understand what that is) that can receive data from
@@ -846,7 +845,7 @@ Some of the common ones are as follows.
| |