Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23144][SS] Added console sink for continuous processing #20311

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.sql.execution.streaming

import java.util.Optional

import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.execution.streaming.sources.{ConsoleContinuousWriter, ConsoleMicroBatchWriter, ConsoleWriter}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand All @@ -37,16 +36,25 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)

class ConsoleSinkProvider extends DataSourceV2
with MicroBatchWriteSupport
with ContinuousWriteSupport
with DataSourceRegister
with CreatableRelationProvider {

override def createMicroBatchWriter(
queryId: String,
epochId: Long,
batchId: Long,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
Optional.of(new ConsoleWriter(epochId, schema, options))
Optional.of(new ConsoleMicroBatchWriter(batchId, schema, options))
}

override def createContinuousWriter(
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[ContinuousWriter] = {
Optional.of(new ConsoleContinuousWriter(schema, options))
}

def createRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType

/**
* A [[DataSourceV2Writer]] that collects results to the driver and prints them in the console.
* Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
*
* This sink should not be used for production, as it requires sending all rows to the driver
* and does not support recovery.
*/
class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
extends DataSourceV2Writer with Logging {
/** Common methods used to create writes for the the console sink */
trait ConsoleWriter extends Logging {

def options: DataSourceV2Options

// Number of rows to display, by default 20 rows
private val numRowsToShow = options.getInt("numRows", 20)
protected val numRowsToShow = options.getInt("numRows", 20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.getBoolean("truncate", true)
protected val isTruncated = options.getBoolean("truncate", true)

assert(SparkSession.getActiveSession.isDefined)
private val spark = SparkSession.getActiveSession.get
protected val spark = SparkSession.getActiveSession.get

def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
def abort(messages: Array[WriterCommitMessage]): Unit = {}

override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
val batch = messages.collect {
protected def printRows(
commitMessages: Array[WriterCommitMessage],
schema: StructType,
printMessage: String): Unit = {
val rows = commitMessages.collect {
case PackedRowCommitMessage(rows) => rows
}.flatten

// scalastyle:off println
println("-------------------------------------------")
println(s"Batch: $batchId")
println(printMessage)
println("-------------------------------------------")
// scalastyle:off println
spark.createDataFrame(
spark.sparkContext.parallelize(batch), schema)
spark
.createDataFrame(spark.sparkContext.parallelize(rows), schema)
.show(numRowsToShow, isTruncated)
}
}


/**
* A [[DataSourceV2Writer]] that collects results from a micro-batch query to the driver and
* prints them in the console. Created by
* [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
*
* This sink should not be used for production, as it requires sending all rows to the driver
* and does not support recovery.
*/
class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)
extends DataSourceV2Writer with ConsoleWriter {

override def commit(messages: Array[WriterCommitMessage]): Unit = {
printRows(messages, schema, s"Batch: $batchId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR - but can you envision scenarios where microbatch.commit(messages) would be implemented differently than continuous.commit(batchId, messages)? Seeing this in context made me realize there's an obvious strategy to unify the streaming writer APIs.

}

override def toString(): String = {
s"ConsoleMicroBatchWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}
}

override def abort(messages: Array[WriterCommitMessage]): Unit = {}

override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
/**
* A [[DataSourceV2Writer]] that collects results from a continuous query to the driver and
* prints them in the console. Created by
* [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
*
* This sink should not be used for production, as it requires sending all rows to the driver
* and does not support recovery.
*/
class ConsoleContinuousWriter(schema: StructType, val options: DataSourceV2Options)
extends ContinuousWriter with ConsoleWriter {

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
printRows(messages, schema, s"Continuous processing epoch $epochId")
}

override def toString(): String = {
s"ConsoleContinuousWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.spark.sql.execution.streaming.sources

import java.io.ByteArrayOutputStream

import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.{StreamTest, Trigger}

class ConsoleWriterSuite extends StreamTest {
import testImplicits._

test("console") {
test("microbatch - default") {
val input = MemoryStream[Int]

val captured = new ByteArrayOutputStream()
Expand Down Expand Up @@ -77,7 +79,7 @@ class ConsoleWriterSuite extends StreamTest {
|""".stripMargin)
}

test("console with numRows") {
test("microbatch - with numRows") {
val input = MemoryStream[Int]

val captured = new ByteArrayOutputStream()
Expand Down Expand Up @@ -106,7 +108,7 @@ class ConsoleWriterSuite extends StreamTest {
|""".stripMargin)
}

test("console with truncation") {
test("microbatch - truncation") {
val input = MemoryStream[String]

val captured = new ByteArrayOutputStream()
Expand All @@ -132,4 +134,20 @@ class ConsoleWriterSuite extends StreamTest {
|
|""".stripMargin)
}

test("continuous - default") {
val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
val input = spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "5")
.load()
.select('value)

val query = input.writeStream.format("console").trigger(Trigger.Continuous(200)).start()
assert(query.isActive)
query.stop()
}
}
}