diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 8b2f51a378dc7..522e669568678 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -119,6 +119,124 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+### Creating a Kafka Source Batch
+If you have a use case that is better suited to batch processing,
+you can create an Dataset/DataFrame for a defined range of offsets.
+
+
+
+{% highlight scala %}
+
+// Subscribe to 1 topic defaults to the earliest and latest offsets
+val ds1 = spark
+ .read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+// Subscribe to multiple topics, specifying explicit Kafka offsets
+val ds2 = spark
+ .read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
+ .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
+ .load()
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+// Subscribe to a pattern, at the earliest and latest offsets
+val ds3 = spark
+ .read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .option("startingOffsets", "earliest")
+ .option("endingOffsets", "latest")
+ .load()
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+// Subscribe to 1 topic defaults to the earliest and latest offsets
+Dataset ds1 = spark
+ .read()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load();
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Subscribe to multiple topics, specifying explicit Kafka offsets
+Dataset ds2 = spark
+ .read()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
+ .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
+ .load();
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+// Subscribe to a pattern, at the earliest and latest offsets
+Dataset ds3 = spark
+ .read()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .option("startingOffsets", "earliest")
+ .option("endingOffsets", "latest")
+ .load();
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+# Subscribe to 1 topic defaults to the earliest and latest offsets
+ds1 = spark \
+ .read \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("subscribe", "topic1") \
+ .load()
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+# Subscribe to multiple topics, specifying explicit Kafka offsets
+ds2 = spark \
+ .read \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("subscribe", "topic1,topic2") \
+ .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
+ .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
+ .load()
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+# Subscribe to a pattern, at the earliest and latest offsets
+ds3 = spark \
+ .read \
+ .format("kafka") \
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+ .option("subscribePattern", "topic.*") \
+ .option("startingOffsets", "earliest") \
+ .option("endingOffsets", "latest") \
+ .load()
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+
+{% endhighlight %}
+
+
+
Each row in the source has the following schema: