From 8ce6f3881a1dabbc6cebf2702960961571fce00e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 16 Jan 2018 16:27:35 -0800 Subject: [PATCH] merge into console.scala --- ...pache.spark.sql.sources.DataSourceRegister | 2 +- .../streaming/sources/ConsoleWriter.scala | 57 ------------------- .../streaming/{ => sources}/console.scala | 41 +++++++++++-- 3 files changed, 37 insertions(+), 63 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{ => sources}/console.scala (61%) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0259c774bbf4a..1fe7687088cda 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat -org.apache.spark.sql.execution.streaming.ConsoleSinkProvider +org.apache.spark.sql.execution.streaming.sources.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala deleted file mode 100644 index 1a7275c1c4ffc..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.sources - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.sources.v2.DataSourceV2Options -import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} -import org.apache.spark.sql.types.StructType - -class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) - extends DataSourceV2Writer with Logging { - // Number of rows to display, by default 20 rows - private val numRowsToShow = options.getInt("numRows", 20) - - // Truncate the displayed data if it is too long, by default it is true - private val isTruncated = options.getBoolean("truncate", true) - - assert(SparkSession.getActiveSession.isDefined) - private val spark = SparkSession.getActiveSession.get - - override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory - - override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { - val batch = messages.collect { - case PackedRowCommitMessage(rows) => rows - }.fold(Array())(_ ++ _) - - // scalastyle:off println - println("-------------------------------------------") - println(s"Batch: $batchId") - println("-------------------------------------------") - // scalastyle:off println - spark.createDataFrame( - spark.sparkContext.parallelize(batch), schema) - .show(numRowsToShow, isTruncated) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = {} - - override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala similarity index 61% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala index 94820376ff7e7..7eb454262a3c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/console.scala @@ -15,21 +15,52 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.util.Optional -import scala.collection.JavaConverters._ - +import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer +import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options) + extends DataSourceV2Writer with Logging { + // Number of rows to display, by default 20 rows + private val numRowsToShow = options.getInt("numRows", 20) + + // Truncate the displayed data if it is too long, by default it is true + private val isTruncated = options.getBoolean("truncate", true) + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + + override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory + + override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized { + val batch = messages.collect { + case PackedRowCommitMessage(rows) => rows + }.fold(Array())(_ ++ _) + + // scalastyle:off println + println("-------------------------------------------") + println(s"Batch: $batchId") + println("-------------------------------------------") + // scalastyle:off println + spark.createDataFrame( + spark.sparkContext.parallelize(batch), schema) + .show(numRowsToShow, isTruncated) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = {} + + override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]" +} + case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) extends BaseRelation { override def schema: StructType = data.schema