diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 953c3a4d15559..bb0b9e673deb3 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -120,6 +120,8 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
### Creating a Kafka Source Batch
+If you have a use case that is better suited to batch processing,
+you can create an Dataset/DataFrame for a defined range of offsets.
@@ -170,8 +172,8 @@ Dataset ds1 = spark
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
- .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset ds2 = spark
@@ -181,8 +183,8 @@ Dataset ds2 = spark
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
- .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
Dataset ds3 = spark
@@ -192,8 +194,8 @@ Dataset ds3 = spark
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
- .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .load();
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
{% endhighlight %}
@@ -201,7 +203,7 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% highlight python %}
# Subscribe to 1 topic defaults to the earliest and latest offsets
-ds1 = spark
+ds1 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -210,7 +212,7 @@ ds1 = spark
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
-ds2 = spark
+ds2 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -221,7 +223,7 @@ ds2 = spark
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
-ds3 = spark
+ds3 = spark \
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -326,8 +328,8 @@ The following configurations are optional:
latest |
batch only |
-
The end point when a batch query is started, either "latest" which is just from the latest
- offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -1
+ | The end point when a batch query is ended, either "latest" which is just referred to the
+ latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1
as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. |