Skip to content

Commit

Permalink
merge into console.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 17, 2018
1 parent fac17a4 commit 8ce6f38
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,52 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming
package org.apache.spark.sql.execution.streaming.sources

import java.util.Optional

import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
extends DataSourceV2Writer with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.getInt("numRows", 20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.getBoolean("truncate", true)

assert(SparkSession.getActiveSession.isDefined)
private val spark = SparkSession.getActiveSession.get

override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory

override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
val batch = messages.collect {
case PackedRowCommitMessage(rows) => rows
}.fold(Array())(_ ++ _)

// scalastyle:off println
println("-------------------------------------------")
println(s"Batch: $batchId")
println("-------------------------------------------")
// scalastyle:off println
spark.createDataFrame(
spark.sparkContext.parallelize(batch), schema)
.show(numRowsToShow, isTruncated)
}

override def abort(messages: Array[WriterCommitMessage]): Unit = {}

override def toString(): String = s"ConsoleWriter[numRows=$numRowsToShow, truncate=$isTruncated]"
}

case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
extends BaseRelation {
override def schema: StructType = data.schema
Expand Down

0 comments on commit 8ce6f38

Please sign in to comment.