From 3abe75c8db6c584e128dad7c45c10ac8a15af979 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 10 Jan 2018 15:19:28 -0800 Subject: [PATCH 01/16] basic writer --- .../sql/execution/streaming/console.scala | 62 ++++++------------- .../streaming/sources/ConsoleWriter.scala | 56 +++++++++++++++++ .../sources/PackedRowWriterFactory.scala | 50 +++++++++++++++ 3 files changed, 126 insertions(+), 42 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 71eaabe273fea..8969a645651d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -17,58 +17,36 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, StreamSinkProvider} -import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.StructType - -class ConsoleSink(options: Map[String, String]) extends Sink with Logging { - // Number of rows to display, by default 20 rows - private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20) - - // Truncate the displayed data if it is too long, by default it is true - private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true) +import java.util.Optional - // Track the batch id - private var lastBatchId = -1L - - override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { - val batchIdStr = if (batchId <= lastBatchId) { - s"Rerun batch: $batchId" - } else { - lastBatchId = batchId - s"Batch: $batchId" - } - - // scalastyle:off println - println("-------------------------------------------") - println(batchIdStr) - println("-------------------------------------------") - // scalastyle:off println - data.sparkSession.createDataFrame( - data.sparkSession.sparkContext.parallelize(data.collect()), data.schema) - .show(numRowsToShow, isTruncated) - } +import scala.collection.JavaConverters._ - override def toString(): String = s"ConsoleSink[numRows=$numRowsToShow, truncate=$isTruncated]" -} +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) extends BaseRelation { override def schema: StructType = data.schema } -class ConsoleSinkProvider extends StreamSinkProvider +class ConsoleSinkProvider extends DataSourceV2 + with MicroBatchWriteSupport with DataSourceRegister with CreatableRelationProvider { - def createSink( - sqlContext: SQLContext, - parameters: Map[String, String], - partitionColumns: Seq[String], - outputMode: OutputMode): Sink = { - new ConsoleSink(parameters) + + override def createMicroBatchWriter( + queryId: String, + epochId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): Optional[DataSourceV2Writer] = { + new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap) } def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala new file mode 100644 index 0000000000000..a6fbdaaa3a2a2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String]) + extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20) + + // Truncate the displayed data if it is too long, by default it is true + private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true) + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + + override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory + + override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { + val batch = messages.collect { + case PackedRowCommitMessage(rows) => rows + }.reduce(_ ++ _) + + // scalastyle:off println + println("-------------------------------------------") + println(s"Batch: $batchId") + println("-------------------------------------------") + // scalastyle:off println + spark.createDataFrame( + spark.sparkContext.parallelize(batch), schema) + .show(numRowsToShow, isTruncated) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = {} + + override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala new file mode 100644 index 0000000000000..bcb9f05975767 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} + +/** + * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery + * to the [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver. + */ +case object PackedRowWriterFactory extends DataWriterFactory[Row] { + def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + new PackedRowDataWriter() + } +} + +case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage + +class PackedRowDataWriter() extends DataWriter[Row] with Logging { + private val data = mutable.Buffer[Row]() + + override def write(row: Row): Unit = data.append(row) + + override def commit(): PackedRowCommitMessage = { + val msg = PackedRowCommitMessage(data.clone().toArray) + data.clear() + msg + } + + override def abort(): Unit = data.clear() +} From 71cc6e41cc19af8e672c67624ca16f330804ccc8 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 11 Jan 2018 19:27:42 -0800 Subject: [PATCH 02/16] add test --- .../sql/execution/streaming/console.scala | 2 +- .../sql/streaming/DataStreamWriter.scala | 16 ++--- .../sources/ConsoleWriterSuite.scala | 69 +++++++++++++++++++ 3 files changed, 77 insertions(+), 10 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 8969a645651d7..c9de3dcfeb145 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -46,7 +46,7 @@ class ConsoleSinkProvider extends DataSourceV2 schema: StructType, mode: OutputMode, options: DataSourceV2Options): Optional[DataSourceV2Writer] = { - new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap) + Optional.of(new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap)) } def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b5b4a05ab4973..d113f830d7392 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2} -import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport +import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, MicroBatchWriteSupport} /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -280,14 +280,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { - val sink = trigger match { - case _: ContinuousTrigger => - val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) - ds.newInstance() match { - case w: ContinuousWriteSupport => w - case _ => throw new AnalysisException( - s"Data source $source does not support continuous writing") - } + val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) + val sink = (ds.newInstance(), trigger) match { + case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w + case (_, _: ContinuousTrigger) => throw new AnalysisException( + s"Data source $source does not support continuous writing") + case (w: MicroBatchWriteSupport, _) => w case _ => val ds = DataSource( df.sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala new file mode 100644 index 0000000000000..7140996e3d202 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.ByteArrayOutputStream + +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming.StreamTest + +class ConsoleWriterSuite extends StreamTest { + import testImplicits._ + + test("console") { + val input = MemoryStream[Int] + + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + val query = input.toDF().writeStream.format("console").start() + try { + input.addData(1, 2, 3) + query.processAllAvailable() + input.addData(4, 5, 6) + query.processAllAvailable() + } finally { + query.stop() + } + } + + assert(captured.toString() == + """------------------------------------------- + |Batch: 0 + |------------------------------------------- + |+-----+ + ||value| + |+-----+ + || 1| + || 2| + || 3| + |+-----+ + | + |------------------------------------------- + |Batch: 1 + |------------------------------------------- + |+-----+ + ||value| + |+-----+ + || 4| + || 5| + || 6| + |+-----+ + | + |""".stripMargin) + } +} From e3af17c2486b24406358f6582b68defee2e5505e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 12 Jan 2018 10:30:05 -0800 Subject: [PATCH 03/16] allow empty batch --- .../execution/streaming/sources/ConsoleWriter.scala | 2 +- .../streaming/sources/ConsoleWriterSuite.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala index a6fbdaaa3a2a2..37cf0de44df3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -38,7 +38,7 @@ class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, Stri override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { val batch = messages.collect { case PackedRowCommitMessage(rows) => rows - }.reduce(_ ++ _) + }.fold(Array())(_ ++ _) // scalastyle:off println println("-------------------------------------------") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala index 7140996e3d202..8adaf44ece305 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala @@ -36,6 +36,8 @@ class ConsoleWriterSuite extends StreamTest { query.processAllAvailable() input.addData(4, 5, 6) query.processAllAvailable() + input.addData() + query.processAllAvailable() } finally { query.stop() } @@ -64,6 +66,14 @@ class ConsoleWriterSuite extends StreamTest { || 6| |+-----+ | + |------------------------------------------- + |Batch: 2 + |------------------------------------------- + |+-----+ + ||value| + |+-----+ + |+-----+ + | |""".stripMargin) } } From 45df30a8b8504546c00b3ffa26dddf41343f8838 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 14:41:19 -0800 Subject: [PATCH 04/16] the -> a --- .../execution/streaming/sources/PackedRowWriterFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index bcb9f05975767..399b049157727 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, Wr /** * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery - * to the [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver. + * to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver. */ case object PackedRowWriterFactory extends DataWriterFactory[Row] { def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { From 7154f3430b24090d6154939c00f38348afe9fdae Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 14:42:07 -0800 Subject: [PATCH 05/16] rm redundant tests --- .../test/DataStreamReaderWriterSuite.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index aa163d2211c38..8212fb912ec57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -422,21 +422,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } - test("ConsoleSink can be correctly loaded") { - LastOptions.clear() - val df = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .load() - - val sq = df.writeStream - .format("console") - .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime(2.seconds)) - .start() - - sq.awaitTermination(2000L) - } - test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath @@ -450,16 +435,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } - test("ConsoleSink should not require checkpointLocation") { - LastOptions.clear() - val df = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .load() - - val sq = df.writeStream.format("console").start() - sq.stop() - } - private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) From be880b19df0f707137c76f70bec1c5af71fe5a01 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 14:48:24 -0800 Subject: [PATCH 06/16] fix options and add tests --- .../sql/execution/streaming/console.scala | 2 +- .../streaming/sources/ConsoleWriter.scala | 7 ++- .../sources/ConsoleWriterSuite.scala | 56 +++++++++++++++++++ 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index c9de3dcfeb145..94820376ff7e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -46,7 +46,7 @@ class ConsoleSinkProvider extends DataSourceV2 schema: StructType, mode: OutputMode, options: DataSourceV2Options): Optional[DataSourceV2Writer] = { - Optional.of(new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap)) + Optional.of(new ConsoleWriter(epochId, schema, options)) } def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala index 37cf0de44df3f..1a7275c1c4ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -19,16 +19,17 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.v2.DataSourceV2Options import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.types.StructType -class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String]) +class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) extends DataSourceV2Writer with Logging { // Number of rows to display, by default 20 rows - private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20) + private val numRowsToShow = options.getInt("numRows", 20) // Truncate the displayed data if it is too long, by default it is true - private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true) + private val isTruncated = options.getBoolean("truncate", true) assert(SparkSession.getActiveSession.isDefined) private val spark = SparkSession.getActiveSession.get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala index 8adaf44ece305..60ffee9b9b42c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala @@ -76,4 +76,60 @@ class ConsoleWriterSuite extends StreamTest { | |""".stripMargin) } + + test("console with numRows") { + val input = MemoryStream[Int] + + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + val query = input.toDF().writeStream.format("console").option("NUMROWS", 2).start() + try { + input.addData(1, 2, 3) + query.processAllAvailable() + } finally { + query.stop() + } + } + + assert(captured.toString() == + """------------------------------------------- + |Batch: 0 + |------------------------------------------- + |+-----+ + ||value| + |+-----+ + || 1| + || 2| + |+-----+ + |only showing top 2 rows + | + |""".stripMargin) + } + + test("console with truncation") { + val input = MemoryStream[String] + + val captured = new ByteArrayOutputStream() + Console.withOut(captured) { + val query = input.toDF().writeStream.format("console").option("TRUNCATE", true).start() + try { + input.addData("123456789012345678901234567890") + query.processAllAvailable() + } finally { + query.stop() + } + } + + assert(captured.toString() == + """------------------------------------------- + |Batch: 0 + |------------------------------------------- + |+--------------------+ + || value| + |+--------------------+ + ||12345678901234567...| + |+--------------------+ + | + |""".stripMargin) + } } From e4c64294f130dada6c345f55e9c6000fcd5009da Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 15:49:11 -0800 Subject: [PATCH 07/16] add data source write selection tests --- ...pache.spark.sql.sources.DataSourceRegister | 4 + .../sources/StreamingDataSourceV2Suite.scala | 199 ++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index c6973bf41d34b..2405a9627982e 100644 --- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,3 +5,7 @@ org.apache.spark.sql.sources.FakeSourceFour org.apache.fakesource.FakeExternalSourceOne org.apache.fakesource.FakeExternalSourceTwo org.apache.fakesource.FakeExternalSourceThree +org.apache.spark.sql.streaming.sources.FakeStreamingMicroBatchOnly +org.apache.spark.sql.streaming.sources.FakeStreamingContinuousOnly +org.apache.spark.sql.streaming.sources.FakeStreamingBothModes +org.apache.spark.sql.streaming.sources.FakeStreamingNeitherMode diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala new file mode 100644 index 0000000000000..1ecf8b47ee5ec --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.sources + +import java.util.Optional + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.{LongOffset, RateStreamOffset} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.ReadTask +import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport, MicroBatchReadSupport, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +object FakeReader extends MicroBatchReader with ContinuousReader { + def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {} + def getStartOffset: Offset = RateStreamOffset(Map()) + def getEndOffset: Offset = RateStreamOffset(Map()) + def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) + def commit(end: Offset): Unit = {} + def readSchema(): StructType = StructType(Seq()) + def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = new java.util.ArrayList() + def stop(): Unit = {} + def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) + def setOffset(start: Optional[Offset]): Unit = {} +} + +class FakeStreamingMicroBatchOnly extends DataSourceRegister + with DataSourceV2 with MicroBatchReadSupport with MicroBatchWriteSupport { + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = FakeReader + + def createMicroBatchWriter( + queryId: String, + epochId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): Optional[DataSourceV2Writer] = { + throw new IllegalStateException("fake sink - cannot actually write") + } + + override def shortName(): String = "fake-microbatch-only" +} + +class FakeStreamingContinuousOnly extends DataSourceRegister + with DataSourceV2 with ContinuousReadSupport with ContinuousWriteSupport { + override def createContinuousReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): ContinuousReader = FakeReader + + def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): Optional[ContinuousWriter] = { + throw new IllegalStateException("fake sink - cannot actually write") + } + + override def shortName(): String = "fake-continuous-only" +} + +class FakeStreamingBothModes extends DataSourceRegister + with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport + with MicroBatchWriteSupport with ContinuousWriteSupport { + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = FakeReader + + def createMicroBatchWriter( + queryId: String, + epochId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): Optional[DataSourceV2Writer] = { + throw new IllegalStateException("fake sink - cannot actually write") + } + + override def createContinuousReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): ContinuousReader = FakeReader + + def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): Optional[ContinuousWriter] = { + throw new IllegalStateException("fake sink - cannot actually write") + } + + override def shortName(): String = "fake-both-modes" +} + +class FakeStreamingNeitherMode extends DataSourceRegister with DataSourceV2 { + override def shortName(): String = "fake-neither-mode" +} + +class StreamingDataSourceV2Suite extends StreamTest { + + private def df = spark.readStream.format("rate").load() + + override def beforeAll(): Unit = { + super.beforeAll() + val fakeCheckpoint = Utils.createTempDir() + spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath) + } + + testQuietly("create microbatch with only microbatch support") { + val query = df.writeStream.format("fake-microbatch-only").start() + query.stop() + } + + testQuietly("create microbatch with both support") { + val query = df.writeStream.format("fake-both-modes").start() + query.stop() + } + + testQuietly("create continuous with only continuous support") { + val query = df.writeStream + .format("fake-continuous-only") + .trigger(Trigger.Continuous(100)) + .start() + query.stop() + } + + testQuietly("create continuous with both support") { + val query = df.writeStream + .format("fake-both-modes") + .trigger(Trigger.Continuous(100)) + .start() + query.stop() + } + + test("microbatch with only continuous support") { + val ex = intercept[UnsupportedOperationException] { + df.writeStream.format("fake-continuous-only").start() + } + + assert(ex.getMessage.contains( + "Data source fake-continuous-only does not support streamed writing")) + } + + test("microbatch with no support") { + val ex = intercept[UnsupportedOperationException] { + df.writeStream.format("fake-neither-mode").start() + } + + assert(ex.getMessage.contains( + "Data source fake-neither-mode does not support streamed writing")) + } + + test("continuous with only microbatch support") { + val ex = intercept[AnalysisException] { + df.writeStream + .format("fake-microbatch-only") + .trigger(Trigger.Continuous(100)) + .start() + } + + assert(ex.getMessage.contains( + "Data source fake-microbatch-only does not support continuous writing")) + } + + test("continuous with no support") { + val ex = intercept[AnalysisException] { + df.writeStream + .format("fake-neither-mode") + .trigger(Trigger.Continuous(100)) + .start() + } + + assert(ex.getMessage.contains( + "Data source fake-neither-mode does not support continuous writing")) + } +} From 8ce6f3881a1dabbc6cebf2702960961571fce00e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 16:27:35 -0800 Subject: [PATCH 08/16] merge into console.scala --- ...pache.spark.sql.sources.DataSourceRegister | 2 +- .../streaming/sources/ConsoleWriter.scala | 57 ------------------- .../streaming/{ => sources}/console.scala | 41 +++++++++++-- 3 files changed, 37 insertions(+), 63 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{ => sources}/console.scala (61%) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0259c774bbf4a..1fe7687088cda 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat -org.apache.spark.sql.execution.streaming.ConsoleSinkProvider +org.apache.spark.sql.execution.streaming.sources.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala deleted file mode 100644 index 1a7275c1c4ffc..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.sources - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.sources.v2.DataSourceV2Options -import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.sql.types.StructType - -class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) - extends DataSourceV2Writer with Logging { - // Number of rows to display, by default 20 rows - private val numRowsToShow = options.getInt("numRows", 20) - - // Truncate the displayed data if it is too long, by default it is true - private val isTruncated = options.getBoolean("truncate", true) - - assert(SparkSession.getActiveSession.isDefined) - private val spark = SparkSession.getActiveSession.get - - override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory - - override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { - val batch = messages.collect { - case PackedRowCommitMessage(rows) => rows - }.fold(Array())(_ ++ _) - - // scalastyle:off println - println("-------------------------------------------") - println(s"Batch: $batchId") - println("-------------------------------------------") - // scalastyle:off println - spark.createDataFrame( - spark.sparkContext.parallelize(batch), schema) - .show(numRowsToShow, isTruncated) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = {} - - override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala similarity index 61% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala index 94820376ff7e7..7eb454262a3c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala @@ -15,21 +15,52 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.util.Optional -import scala.collection.JavaConverters._ - +import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) + extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.getInt("numRows", 20) + + // Truncate the displayed data if it is too long, by default it is true + private val isTruncated = options.getBoolean("truncate", true) + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + + override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory + + override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { + val batch = messages.collect { + case PackedRowCommitMessage(rows) => rows + }.fold(Array())(_ ++ _) + + // scalastyle:off println + println("-------------------------------------------") + println(s"Batch: $batchId") + println("-------------------------------------------") + // scalastyle:off println + spark.createDataFrame( + spark.sparkContext.parallelize(batch), schema) + .show(numRowsToShow, isTruncated) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = {} + + override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" +} + case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) extends BaseRelation { override def schema: StructType = data.schema From d7b4571c493242a46430d17a1ff803145fbf79fd Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 16:28:37 -0800 Subject: [PATCH 09/16] Revert "merge into console.scala" This reverts commit 8ce6f3881a1dabbc6cebf2702960961571fce00e. --- ...pache.spark.sql.sources.DataSourceRegister | 2 +- .../streaming/{sources => }/console.scala | 41 ++----------- .../streaming/sources/ConsoleWriter.scala | 57 +++++++++++++++++++ 3 files changed, 63 insertions(+), 37 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{sources => }/console.scala (61%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 1fe7687088cda..0259c774bbf4a 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat -org.apache.spark.sql.execution.streaming.sources.ConsoleSinkProvider +org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala similarity index 61% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 7eb454262a3c2..94820376ff7e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -15,52 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming.sources +package org.apache.spark.sql.execution.streaming import java.util.Optional -import org.apache.spark.internal.Logging +import scala.collection.JavaConverters._ + import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport -import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType -class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) - extends DataSourceV2Writer with Logging { - // Number of rows to display, by default 20 rows - private val numRowsToShow = options.getInt("numRows", 20) - - // Truncate the displayed data if it is too long, by default it is true - private val isTruncated = options.getBoolean("truncate", true) - - assert(SparkSession.getActiveSession.isDefined) - private val spark = SparkSession.getActiveSession.get - - override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory - - override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { - val batch = messages.collect { - case PackedRowCommitMessage(rows) => rows - }.fold(Array())(_ ++ _) - - // scalastyle:off println - println("-------------------------------------------") - println(s"Batch: $batchId") - println("-------------------------------------------") - // scalastyle:off println - spark.createDataFrame( - spark.sparkContext.parallelize(batch), schema) - .show(numRowsToShow, isTruncated) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = {} - - override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" -} - case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) extends BaseRelation { override def schema: StructType = data.schema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala new file mode 100644 index 0000000000000..1a7275c1c4ffc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) + extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.getInt("numRows", 20) + + // Truncate the displayed data if it is too long, by default it is true + private val isTruncated = options.getBoolean("truncate", true) + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + + override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory + + override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { + val batch = messages.collect { + case PackedRowCommitMessage(rows) => rows + }.fold(Array())(_ ++ _) + + // scalastyle:off println + println("-------------------------------------------") + println(s"Batch: $batchId") + println("-------------------------------------------") + // scalastyle:off println + spark.createDataFrame( + spark.sparkContext.parallelize(batch), schema) + .show(numRowsToShow, isTruncated) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = {} + + override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" +} From c0ec93f99ab34a4201c1161bc68a5e72e8fe553a Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 19:53:28 -0800 Subject: [PATCH 10/16] test all combinations --- .../streaming/MicroBatchExecution.scala | 7 +- .../sql/streaming/DataStreamWriter.scala | 2 +- ...pache.spark.sql.sources.DataSourceRegister | 12 +- .../sources/StreamingDataSourceV2Suite.scala | 245 +++++++++++------- 4 files changed, 161 insertions(+), 105 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 70407f0580f97..7c3804547b736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -91,11 +91,14 @@ class MicroBatchExecution( nextSourceId += 1 StreamingExecutionRelation(reader, output)(sparkSession) }) - case s @ StreamingRelationV2(_, _, _, output, v1Relation) => + case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - assert(v1Relation.isDefined, "v2 execution didn't match but v1 was unavailable") + if (v1Relation.isEmpty) { + throw new UnsupportedOperationException( + s"Data source $sourceName does not support microbatch processing.") + } val source = v1Relation.get.dataSource.createSource(metadataPath) nextSourceId += 1 StreamingExecutionRelation(source, output)(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 3c626752c89b8..d19897202d9c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -283,7 +283,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val sink = (ds.newInstance(), trigger) match { case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w - case (_, _: ContinuousTrigger) => throw new AnalysisException( + case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException( s"Data source $source does not support continuous writing") case (w: MicroBatchWriteSupport, _) => w case _ => diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 2405a9627982e..a0b25b4e82364 100644 --- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,7 +5,11 @@ org.apache.spark.sql.sources.FakeSourceFour org.apache.fakesource.FakeExternalSourceOne org.apache.fakesource.FakeExternalSourceTwo org.apache.fakesource.FakeExternalSourceThree -org.apache.spark.sql.streaming.sources.FakeStreamingMicroBatchOnly -org.apache.spark.sql.streaming.sources.FakeStreamingContinuousOnly -org.apache.spark.sql.streaming.sources.FakeStreamingBothModes -org.apache.spark.sql.streaming.sources.FakeStreamingNeitherMode +org.apache.spark.sql.streaming.sources.FakeReadMicroBatchOnly +org.apache.spark.sql.streaming.sources.FakeReadContinuousOnly +org.apache.spark.sql.streaming.sources.FakeReadBothModes +org.apache.spark.sql.streaming.sources.FakeReadNeitherMode +org.apache.spark.sql.streaming.sources.FakeWriteMicroBatchOnly +org.apache.spark.sql.streaming.sources.FakeWriteContinuousOnly +org.apache.spark.sql.streaming.sources.FakeWriteBothModes +org.apache.spark.sql.streaming.sources.FakeWriteNeitherMode diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 1ecf8b47ee5ec..7c1234688136f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.streaming.sources import java.util.Optional import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{LongOffset, RateStreamOffset} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.reader.ReadTask @@ -28,30 +30,41 @@ import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, Continu import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset} import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer -import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -object FakeReader extends MicroBatchReader with ContinuousReader { +case class FakeReader() extends MicroBatchReader with ContinuousReader { def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {} def getStartOffset: Offset = RateStreamOffset(Map()) def getEndOffset: Offset = RateStreamOffset(Map()) def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) def commit(end: Offset): Unit = {} def readSchema(): StructType = StructType(Seq()) - def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = new java.util.ArrayList() def stop(): Unit = {} def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) def setOffset(start: Optional[Offset]): Unit = {} + + def createReadTasks(): java.util.ArrayList[ReadTask[Row]] = { + throw new IllegalStateException("fake source - cannot actually read") + } } -class FakeStreamingMicroBatchOnly extends DataSourceRegister - with DataSourceV2 with MicroBatchReadSupport with MicroBatchWriteSupport { +trait FakeMicroBatchReadSupport extends MicroBatchReadSupport { override def createMicroBatchReader( schema: Optional[StructType], checkpointLocation: String, - options: DataSourceV2Options): MicroBatchReader = FakeReader + options: DataSourceV2Options): MicroBatchReader = FakeReader() +} + +trait FakeContinuousReadSupport extends ContinuousReadSupport { + override def createContinuousReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): ContinuousReader = FakeReader() +} +trait FakeMicroBatchWriteSupport extends MicroBatchWriteSupport { def createMicroBatchWriter( queryId: String, epochId: Long, @@ -60,17 +73,9 @@ class FakeStreamingMicroBatchOnly extends DataSourceRegister options: DataSourceV2Options): Optional[DataSourceV2Writer] = { throw new IllegalStateException("fake sink - cannot actually write") } - - override def shortName(): String = "fake-microbatch-only" } -class FakeStreamingContinuousOnly extends DataSourceRegister - with DataSourceV2 with ContinuousReadSupport with ContinuousWriteSupport { - override def createContinuousReader( - schema: Optional[StructType], - checkpointLocation: String, - options: DataSourceV2Options): ContinuousReader = FakeReader - +trait FakeContinuousWriteSupport extends ContinuousWriteSupport { def createContinuousWriter( queryId: String, schema: StructType, @@ -78,50 +83,43 @@ class FakeStreamingContinuousOnly extends DataSourceRegister options: DataSourceV2Options): Optional[ContinuousWriter] = { throw new IllegalStateException("fake sink - cannot actually write") } +} - override def shortName(): String = "fake-continuous-only" +class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport { + override def shortName(): String = "fake-read-microbatch-only" } -class FakeStreamingBothModes extends DataSourceRegister - with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport - with MicroBatchWriteSupport with ContinuousWriteSupport { - override def createMicroBatchReader( - schema: Optional[StructType], - checkpointLocation: String, - options: DataSourceV2Options): MicroBatchReader = FakeReader +class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport { + override def shortName(): String = "fake-read-continuous-only" +} - def createMicroBatchWriter( - queryId: String, - epochId: Long, - schema: StructType, - mode: OutputMode, - options: DataSourceV2Options): Optional[DataSourceV2Writer] = { - throw new IllegalStateException("fake sink - cannot actually write") - } +class FakeReadBothModes extends DataSourceRegister + with FakeMicroBatchReadSupport with FakeContinuousReadSupport { + override def shortName(): String = "fake-read-microbatch-continuous" +} - override def createContinuousReader( - schema: Optional[StructType], - checkpointLocation: String, - options: DataSourceV2Options): ContinuousReader = FakeReader +class FakeReadNeitherMode extends DataSourceRegister { + override def shortName(): String = "fake-read-neither-mode" +} - def createContinuousWriter( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceV2Options): Optional[ContinuousWriter] = { - throw new IllegalStateException("fake sink - cannot actually write") - } +class FakeWriteMicroBatchOnly extends DataSourceRegister with FakeMicroBatchWriteSupport { + override def shortName(): String = "fake-write-microbatch-only" +} - override def shortName(): String = "fake-both-modes" +class FakeWriteContinuousOnly extends DataSourceRegister with FakeContinuousWriteSupport { + override def shortName(): String = "fake-write-continuous-only" } -class FakeStreamingNeitherMode extends DataSourceRegister with DataSourceV2 { - override def shortName(): String = "fake-neither-mode" +class FakeWriteBothModes extends DataSourceRegister + with FakeMicroBatchWriteSupport with FakeContinuousWriteSupport { + override def shortName(): String = "fake-write-microbatch-continuous" } -class StreamingDataSourceV2Suite extends StreamTest { +class FakeWriteNeitherMode extends DataSourceRegister { + override def shortName(): String = "fake-write-neither-mode" +} - private def df = spark.readStream.format("rate").load() +class StreamingDataSourceV2Suite extends StreamTest { override def beforeAll(): Unit = { super.beforeAll() @@ -129,71 +127,122 @@ class StreamingDataSourceV2Suite extends StreamTest { spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath) } - testQuietly("create microbatch with only microbatch support") { - val query = df.writeStream.format("fake-microbatch-only").start() - query.stop() - } - - testQuietly("create microbatch with both support") { - val query = df.writeStream.format("fake-both-modes").start() - query.stop() - } - - testQuietly("create continuous with only continuous support") { - val query = df.writeStream - .format("fake-continuous-only") - .trigger(Trigger.Continuous(100)) - .start() - query.stop() - } - - testQuietly("create continuous with both support") { - val query = df.writeStream - .format("fake-both-modes") - .trigger(Trigger.Continuous(100)) + val readFormats = Seq( + "fake-read-microbatch-only", + "fake-read-continuous-only", + "fake-read-microbatch-continuous", + "fake-read-neither-mode") + val writeFormats = Seq( + "fake-write-microbatch-only", + "fake-write-continuous-only", + "fake-write-microbatch-continuous", + "fake-write-neither-mode") + val triggers = Seq( + Trigger.Once(), + Trigger.ProcessingTime(1000), + Trigger.Continuous(1000)) + + private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = { + val query = spark.readStream + .format(readFormat) + .load() + .writeStream + .format(writeFormat) + .trigger(trigger) .start() query.stop() } - test("microbatch with only continuous support") { + private def testUnsupportedOperationCase( + readFormat: String, + writeFormat: String, + trigger: Trigger, + errorMsg: String) = { val ex = intercept[UnsupportedOperationException] { - df.writeStream.format("fake-continuous-only").start() + testPositiveCase(readFormat, writeFormat, trigger) } - - assert(ex.getMessage.contains( - "Data source fake-continuous-only does not support streamed writing")) + assert(ex.getMessage.contains(errorMsg)) } - test("microbatch with no support") { - val ex = intercept[UnsupportedOperationException] { - df.writeStream.format("fake-neither-mode").start() + private def testLogicalPlanCase( + readFormat: String, + writeFormat: String, + trigger: Trigger, + errorMsg: String) = { + val ex = intercept[StreamingQueryException] { + spark.readStream + .format(readFormat) + .load() + .writeStream + .format(writeFormat) + .trigger(trigger) + .start() + .processAllAvailable() } - - assert(ex.getMessage.contains( - "Data source fake-neither-mode does not support streamed writing")) + assert(ex.cause != null) + assert(ex.cause.getMessage.contains(errorMsg)) } - test("continuous with only microbatch support") { - val ex = intercept[AnalysisException] { - df.writeStream - .format("fake-microbatch-only") - .trigger(Trigger.Continuous(100)) - .start() + // Get a list of (read, write, trigger) tuples for test cases. + val cases = readFormats.flatMap { read => + writeFormats.flatMap { write => + triggers.map(t => (write, t)) + }.map { + case (write, t) => (read, write, t) } - - assert(ex.getMessage.contains( - "Data source fake-microbatch-only does not support continuous writing")) } - test("continuous with no support") { - val ex = intercept[AnalysisException] { - df.writeStream - .format("fake-neither-mode") - .trigger(Trigger.Continuous(100)) - .start() + for ((read, write, trigger) <- cases) { + testQuietly(s"stream with read format $read, write format $write, trigger $trigger") { + val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf).newInstance() + val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).newInstance() + (readSource, writeSource, trigger) match { + // Valid microbatch queries. + case (_: MicroBatchReadSupport, _: MicroBatchWriteSupport, t) + if !t.isInstanceOf[ContinuousTrigger] => + testPositiveCase(read, write, trigger) + + // Valid continuous queries. + case (_: ContinuousReadSupport, _: ContinuousWriteSupport, _: ContinuousTrigger) => + testPositiveCase(read, write, trigger) + + // Invalid - can't read at all + case (r, _, _) + if !r.isInstanceOf[MicroBatchReadSupport] + && !r.isInstanceOf[ContinuousReadSupport] => + testUnsupportedOperationCase(read, write, trigger, + s"Data source $read does not support streamed reading") + + // Invalid - trigger is continuous but writer is not + case (_, w, _: ContinuousTrigger) if !w.isInstanceOf[ContinuousWriteSupport] => + testUnsupportedOperationCase(read, write, trigger, + s"Data source $write does not support continuous writing") + + // Invalid - can't write at all + case (_, w, _) + if !w.isInstanceOf[MicroBatchWriteSupport] + && !w.isInstanceOf[ContinuousWriteSupport] => + testUnsupportedOperationCase(read, write, trigger, + s"Data source $write does not support streamed writing") + + // Invalid - trigger and writer are continuous but reader is not + case (r, _: ContinuousWriteSupport, _: ContinuousTrigger) + if !r.isInstanceOf[ContinuousReadSupport] => + testLogicalPlanCase(read, write, trigger, + s"Data source $read does not support continuous processing") + + // Invalid - trigger is microbatch but writer is not + case (_, w, t) + if !w.isInstanceOf[MicroBatchWriteSupport] && !t.isInstanceOf[ContinuousTrigger] => + testUnsupportedOperationCase(read, write, trigger, + s"Data source $write does not support streamed writing") + + // Invalid - trigger and writer are microbatch but reader is not + case (r, _, t) + if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] => + testLogicalPlanCase(read, write, trigger, + s"Data source $read does not support microbatch processing") + } } - - assert(ex.getMessage.contains( - "Data source fake-neither-mode does not support continuous writing")) } } From 516fd4a919cbc421f0565357ef6696b7b8ba0728 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 20:28:20 -0800 Subject: [PATCH 11/16] fix test --- .../continuous/ContinuousExecution.scala | 4 ++-- .../sources/StreamingDataSourceV2Suite.scala | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 667410ef9f1c6..a4773e3d7db52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -54,7 +54,7 @@ class ContinuousExecution( sparkSession, name, checkpointRoot, analyzedPlan, sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop) { - @volatile protected var continuousSources: Seq[ContinuousReader] = _ + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq() override protected def sources: Seq[BaseStreamingSource] = continuousSources override lazy val logicalPlan: LogicalPlan = { @@ -69,7 +69,7 @@ class ContinuousExecution( ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) }) case StreamingRelationV2(_, sourceName, _, _, _) => - throw new AnalysisException( + throw new UnsupportedOperationException( s"Data source $sourceName does not support continuous processing.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 7c1234688136f..850cb2540107d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -169,18 +169,19 @@ class StreamingDataSourceV2Suite extends StreamTest { writeFormat: String, trigger: Trigger, errorMsg: String) = { - val ex = intercept[StreamingQueryException] { - spark.readStream - .format(readFormat) - .load() - .writeStream - .format(writeFormat) - .trigger(trigger) - .start() - .processAllAvailable() + val query = spark.readStream + .format(readFormat) + .load() + .writeStream + .format(writeFormat) + .trigger(trigger) + .start() + + eventually(timeout(streamingTimeout)) { + assert(query.exception.isDefined) + assert(query.exception.get.cause != null) + assert(query.exception.get.cause.getMessage.contains(errorMsg)) } - assert(ex.cause != null) - assert(ex.cause.getMessage.contains(errorMsg)) } // Get a list of (read, write, trigger) tuples for test cases. From 7abd2b2bb6a8d7ba73caf1f95ae093731ba825f7 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 17 Jan 2018 10:21:35 -0800 Subject: [PATCH 12/16] add docs --- .../execution/streaming/sources/ConsoleWriter.scala | 7 +++++++ .../streaming/sources/PackedRowWriterFactory.scala | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala index 1a7275c1c4ffc..f30c39169ebaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -23,6 +23,13 @@ import org.apache.spark.sql.sources.v2.DataSourceV2Options import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.types.StructType +/** + * A [[DataSourceV2Writer]] that collects results to the driver and prints them in the console. + * Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]]. + * + * This sink should not be used for production, as it requires sending all rows to the driver + * and does not support recovery. + */ class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) extends DataSourceV2Writer with Logging { // Number of rows to display, by default 20 rows diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index 399b049157727..a42ade07958b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, Wr /** * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery * to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver. + * + * Note that, because it sends all rows to the driver, this factory will generally be unsuitable + * for production-quality sinks. It's intended for use in tests. */ case object PackedRowWriterFactory extends DataWriterFactory[Row] { def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { @@ -33,8 +36,15 @@ case object PackedRowWriterFactory extends DataWriterFactory[Row] { } } +/** + * Commit message for a [[PackedRowDataWriter]], containing all the rows written in the most + * recent interval. + */ case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage +/** + * A simple [[DataWriter]] that just sends all the rows it's received as a commit message. + */ class PackedRowDataWriter() extends DataWriter[Row] with Logging { private val data = mutable.Buffer[Row]() From da13f37b41f0c8cc5bdaeb67b65ea564613604ad Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 17 Jan 2018 10:21:47 -0800 Subject: [PATCH 13/16] no redundant clone --- .../execution/streaming/sources/PackedRowWriterFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index a42ade07958b9..9282ba05bdb7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -51,7 +51,7 @@ class PackedRowDataWriter() extends DataWriter[Row] with Logging { override def write(row: Row): Unit = data.append(row) override def commit(): PackedRowCommitMessage = { - val msg = PackedRowCommitMessage(data.clone().toArray) + val msg = PackedRowCommitMessage(data.toArray) data.clear() msg } From a9d6b82642e31ea7714c2bc5c4f0fac411832a1f Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 17 Jan 2018 10:22:28 -0800 Subject: [PATCH 14/16] rename method --- .../sources/StreamingDataSourceV2Suite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 850cb2540107d..9438130b6d72b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -153,7 +153,7 @@ class StreamingDataSourceV2Suite extends StreamTest { query.stop() } - private def testUnsupportedOperationCase( + private def testNegativeCase( readFormat: String, writeFormat: String, trigger: Trigger, @@ -164,7 +164,7 @@ class StreamingDataSourceV2Suite extends StreamTest { assert(ex.getMessage.contains(errorMsg)) } - private def testLogicalPlanCase( + private def testLogicalPlanNegativeCase( readFormat: String, writeFormat: String, trigger: Trigger, @@ -211,37 +211,37 @@ class StreamingDataSourceV2Suite extends StreamTest { case (r, _, _) if !r.isInstanceOf[MicroBatchReadSupport] && !r.isInstanceOf[ContinuousReadSupport] => - testUnsupportedOperationCase(read, write, trigger, + testNegativeCase(read, write, trigger, s"Data source $read does not support streamed reading") // Invalid - trigger is continuous but writer is not case (_, w, _: ContinuousTrigger) if !w.isInstanceOf[ContinuousWriteSupport] => - testUnsupportedOperationCase(read, write, trigger, + testNegativeCase(read, write, trigger, s"Data source $write does not support continuous writing") // Invalid - can't write at all case (_, w, _) if !w.isInstanceOf[MicroBatchWriteSupport] && !w.isInstanceOf[ContinuousWriteSupport] => - testUnsupportedOperationCase(read, write, trigger, + testNegativeCase(read, write, trigger, s"Data source $write does not support streamed writing") // Invalid - trigger and writer are continuous but reader is not case (r, _: ContinuousWriteSupport, _: ContinuousTrigger) if !r.isInstanceOf[ContinuousReadSupport] => - testLogicalPlanCase(read, write, trigger, + testLogicalPlanNegativeCase(read, write, trigger, s"Data source $read does not support continuous processing") // Invalid - trigger is microbatch but writer is not case (_, w, t) if !w.isInstanceOf[MicroBatchWriteSupport] && !t.isInstanceOf[ContinuousTrigger] => - testUnsupportedOperationCase(read, write, trigger, + testNegativeCase(read, write, trigger, s"Data source $write does not support streamed writing") // Invalid - trigger and writer are microbatch but reader is not case (r, _, t) if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] => - testLogicalPlanCase(read, write, trigger, + testLogicalPlanNegativeCase(read, write, trigger, s"Data source $read does not support microbatch processing") } } From 99109a40811d52cb7356b212cf38a3273fd76171 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 17 Jan 2018 14:21:59 -0800 Subject: [PATCH 15/16] initialize continuous plan non-lazy --- .../streaming/continuous/ContinuousExecution.scala | 5 +---- .../sql/streaming/sources/StreamingDataSourceV2Suite.scala | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a4773e3d7db52..134d494aafd81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -57,10 +57,7 @@ class ContinuousExecution( @volatile protected var continuousSources: Seq[ContinuousReader] = Seq() override protected def sources: Seq[BaseStreamingSource] = continuousSources - override lazy val logicalPlan: LogicalPlan = { - assert(queryExecutionThread eq Thread.currentThread, - "logicalPlan must be initialized in StreamExecutionThread " + - s"but the current thread was ${Thread.currentThread}") + override val logicalPlan: LogicalPlan = { val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() analyzedPlan.transform { case r @ StreamingRelationV2( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 9438130b6d72b..f152174b0a7f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -164,7 +164,7 @@ class StreamingDataSourceV2Suite extends StreamTest { assert(ex.getMessage.contains(errorMsg)) } - private def testLogicalPlanNegativeCase( + private def testPostCreationNegativeCase( readFormat: String, writeFormat: String, trigger: Trigger, @@ -229,7 +229,7 @@ class StreamingDataSourceV2Suite extends StreamTest { // Invalid - trigger and writer are continuous but reader is not case (r, _: ContinuousWriteSupport, _: ContinuousTrigger) if !r.isInstanceOf[ContinuousReadSupport] => - testLogicalPlanNegativeCase(read, write, trigger, + testNegativeCase(read, write, trigger, s"Data source $read does not support continuous processing") // Invalid - trigger is microbatch but writer is not @@ -241,7 +241,7 @@ class StreamingDataSourceV2Suite extends StreamTest { // Invalid - trigger and writer are microbatch but reader is not case (r, _, t) if !r.isInstanceOf[MicroBatchReadSupport] && !t.isInstanceOf[ContinuousTrigger] => - testLogicalPlanNegativeCase(read, write, trigger, + testPostCreationNegativeCase(read, write, trigger, s"Data source $read does not support microbatch processing") } } From 278eeb460d20ccc5b0e61d6a952c62f37bc1e0e2 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 17 Jan 2018 14:22:27 -0800 Subject: [PATCH 16/16] use flatten --- .../spark/sql/execution/streaming/sources/ConsoleWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala index f30c39169ebaf..361979984bbec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala @@ -46,7 +46,7 @@ class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Opti override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { val batch = messages.collect { case PackedRowCommitMessage(rows) => rows - }.fold(Array())(_ ++ _) + }.flatten // scalastyle:off println println("-------------------------------------------")