diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index bb0b9e673deb3..f19954beaebf2 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -204,32 +204,32 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); # Subscribe to 1 topic defaults to the earliest and latest offsets ds1 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1") \ .load() ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to multiple topics, specifying explicit Kafka offsets ds2 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribe", "topic1,topic2") - .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") - .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribe", "topic1,topic2") \ + .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \ + .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \ .load() ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # Subscribe to a pattern, at the earliest and latest offsets ds3 = spark \ - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("subscribePattern", "topic.*") - .option("startingOffsets", "earliest") - .option("endingOffsets", "latest") + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("subscribePattern", "topic.*") \ + .option("startingOffsets", "earliest") \ + .option("endingOffsets", "latest") \ .load() ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")