Skip to content

Commit

Permalink
address comments from @zsxwing
Browse files Browse the repository at this point in the history
  • Loading branch information
tcondie committed Feb 14, 2017
1 parent 081ea24 commit 28af040
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
</div>

### Creating a Kafka Source Batch
If you have a use case that is better suited to batch processing,
you can create an Dataset/DataFrame for a defined range of offsets.

<div class="codetabs">
<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -170,8 +172,8 @@ Dataset<Row> ds1 = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> ds2 = spark
Expand All @@ -181,8 +183,8 @@ Dataset<Row> ds2 = spark
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> ds3 = spark
Expand All @@ -192,16 +194,16 @@ Dataset<Row> ds3 = spark
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.load();
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

# Subscribe to 1 topic defaults to the earliest and latest offsets
ds1 = spark
ds1 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
Expand All @@ -210,7 +212,7 @@ ds1 = spark
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
ds2 = spark
ds2 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
Expand All @@ -221,7 +223,7 @@ ds2 = spark
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
ds3 = spark
ds3 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
Expand Down Expand Up @@ -326,8 +328,8 @@ The following configurations are optional:
</td>
<td>latest</td>
<td>batch only</td>
<td>The end point when a batch query is started, either "latest" which is just from the latest
offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -1
<td>The end point when a batch query is ended, either "latest" which is just referred to the
latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1
as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.</td>
</tr>
<tr>
Expand Down

0 comments on commit 28af040

Please sign in to comment.