Skip to content
This repository has been archived by the owner on Feb 14, 2022. It is now read-only.

Feature/apache arrow [WIP] #8

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import xerial.sbt.Sonatype._
val circeV = "0.11.1"
val scalafxV = "8.0.144-R12"
val scalatestV = "3.0.4"
val spark2V = "2.4.0"
val vegaliteV = "2.6.0"
val vegaV = "3.3.1"
val vegaembedV = "3.29.1"
val scalaloggingV = "3.9.0"
val slf4jV = "1.7.16"
val sparkV = "2.4.0"
val sparkV = "2.4.3"

lazy val global = project
.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.immutable.Stream.cons
*/
object ClasspathJarResourceProvider {

def apply(): LibraryProvider = {
def apply(additionalLibraryFilenames: String*): LibraryProvider = {
val streams = findResourcesByFilename(
List("vega.min.js", "vega-lite.min.js", "vega-embed.min.js")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ package com.coxautodata.vegalite4s.providers
* VegaLite Javascript dependency provider that references the Vega
* dependency by providing a URL to the Jsdelivr CDN.
*
* @param vegaVersion Version of Vega to reference
* @param vegaLiteVersion Version of Vega-lite to reference
* @param vegaVersion Version of Vega to reference
* @param vegaLiteVersion Version of Vega-lite to reference
* @param vegaEmbedVersion Version of Vega-embed to reference
*/
case class JsdelivrProvider(vegaVersion: String,
vegaLiteVersion: String,
vegaEmbedVersion: String) extends LibraryProvider {
vegaEmbedVersion: String,
additionalLibraries: Seq[(String, String)] = Seq.empty
) extends LibraryProvider {

override def getJavascriptLibraryURLs: Seq[String] =
override def getJavascriptLibraryURLs: Seq[String] = {
Seq(
s"https://cdn.jsdelivr.net/npm/vega@$vegaVersion",
s"https://cdn.jsdelivr.net/npm/vega-lite@$vegaLiteVersion",
s"https://cdn.jsdelivr.net/npm/vega-embed@$vegaEmbedVersion"
)
"vega" -> vegaVersion,
"vega-lite" -> vegaLiteVersion,
"vega-embed" -> vegaEmbedVersion
) ++ additionalLibraries
}.map { case (k, v) => s"https://cdn.jsdelivr.net/npm/$k@$v" }


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ object LocalFileProvider {
def apply(vegaLiteSchemaVersion: String,
vegaPath: String,
vegaLitePath: String,
vegaEmbedPath: String): LibraryProvider =
vegaEmbedPath: String,
additionalPaths: String*): LibraryProvider =
new InputStreamProvider(
new FileInputStream(vegaPath),
new FileInputStream(vegaLitePath),
new FileInputStream(vegaEmbedPath)
(List(vegaPath, vegaLitePath, vegaEmbedPath) ++ additionalPaths)
.map(new FileInputStream(_)): _*
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package object providers {
* A Vega dependency provider that references the latest available VegaLite library
* available on the Jsdelivr CDN
*/
object VegaLiteLatestCDNVersion extends VegaLiteProvider(JsdelivrProvider("5", "3", "4"), "3")
object VegaLiteLatestCDNVersion extends VegaLiteProvider(JsdelivrProvider("5", "3", "4", Seq("apache-arrow" -> "0.14.0", "vega-loader-arrow" -> "0.0.7")), "3")

/**
* A Vega dependency provider that references the latest available Vega library
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.coxautodata.arrow

import java.nio.channels.Channels

import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.arrow.vector.ipc.ArrowFileWriter
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types.StructType

class ArrowOutputWriter(path: String, context: TaskAttemptContext, schema: StructType, maxRowsPerBatch: Long, timeZoneId: String) extends OutputWriter {


private val outputPath = new Path(path)
private val outputStream = outputPath.getFileSystem(context.getConfiguration).create(outputPath)
private val outputChannel = Channels.newChannel(outputStream)
private val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
private val vectorSchema = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
private val rootWriter = ArrowWriter.create(vectorSchema)
private val dictionaryProvider = new DictionaryProvider.MapDictionaryProvider()
private val fileWriter = new ArrowFileWriter(vectorSchema, dictionaryProvider, outputChannel)
private var count: Long = 0L

//https://github.com/animeshtrivedi/blog/blob/master/post/2017-12-26-arrow.md

private def commitBatch(): Unit = {
rootWriter.finish()
fileWriter.writeBatch()
rootWriter.reset()
count = 0
}

override def write(row: InternalRow): Unit = {
rootWriter.write(row)
count += 1
if (count >= maxRowsPerBatch) commitBatch()
}

override def close(): Unit = {
if (count != 0) commitBatch()
outputChannel.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.coxautodata.arrow

import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.types.StructType

class ArrowOutputWriterFactor(maxRowsPerBatch: Long, timeZoneId: String) extends OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = ".arrow"

override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): ArrowOutputWriter = {
new ArrowOutputWriter(path, context, dataSchema, maxRowsPerBatch, timeZoneId)
}
}
22 changes: 22 additions & 0 deletions spark2/src/main/scala/com/coxautodata/arrow/DefaultSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.coxautodata.arrow

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
import org.apache.spark.sql.types.StructType

private[arrow] class DefaultSource extends FileFormat {
override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] =
throw new UnsupportedOperationException(s"Read not yet supported")

override def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = {

//https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#timestamp-with-time-zone-semantics
val maxRecordsPerBatch = options.getOrElse("maxRecordsPerBatch", "10000").toInt
val timeZoneId = options.getOrElse("timeZoneId", "UTC")
new ArrowOutputWriterFactor(maxRecordsPerBatch, timeZoneId)

}

}
11 changes: 11 additions & 0 deletions spark2/src/main/scala/com/coxautodata/arrow/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.coxautodata

import org.apache.spark.sql.DataFrameWriter

package object arrow {

implicit class ArrowDataFrameWriter[T](writer: DataFrameWriter[T]) {
def arrow: String => Unit = writer.format("com.coxautodata.arrow").save
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.coxautodata.vegalite4s.spark

import com.coxautodata.arrow._
import com.coxautodata.vegalite4s.PlotHelpers._
import com.coxautodata.vegalite4s.spark.filestore.{DatabricksFileStore, StaticFileStore}
import com.coxautodata.vegalite4s.{SpecConstruct, VegaLite}
import io.circe.{Json, JsonObject}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.{NumericType, StructField, StructType}
import org.apache.spark.sql.{Dataset, SparkSession}

Expand Down Expand Up @@ -96,6 +100,46 @@ object PlotHelpers {
* @param ds Dataset to add
*/
def withData(ds: Dataset[_]): T = {
val tryArrow = ds.sparkSession.conf.getOption(SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE).map(_.toBoolean).getOrElse(SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE_DEFAULT)
val candidates = if (tryArrow) List(DatabricksFileStore) else List.empty
candidates
.find(_.staticStoreExists(ds.sparkSession))
.map(withArrowData(ds))
.getOrElse(withEmbeddedData(ds))
}

private [spark] def withArrowData(ds: Dataset[_])(staticStore: StaticFileStore): T = {

lazy val url = {
val outputPath = staticStore.generateOutputPath()

ds.coalesce(1).write.arrow(outputPath.toString)

val arrowFiles = outputPath
.getFileSystem(ds.sparkSession.sparkContext.hadoopConfiguration)
.globStatus(new Path(outputPath, "*.arrow"))
.collect { case f if f.isFile => f.getPath }

arrowFiles.toList match {
case List(one) => staticStore.getStaticURL(one)
case Nil => throw new RuntimeException(s"No arrow files found in [$outputPath]")
case _ => throw new RuntimeException(s"Multiple arrow files found in [$outputPath]")
}
}

spec.withField(
"data",
Json.fromJsonObject(
JsonObject(
"url" -> Json.fromString(url),
"format" -> Json.fromJsonObject(JsonObject("type" -> Json.fromString("arrow")))
)
)
)

}

private def withEmbeddedData(ds: Dataset[_]): T = {
def toSeqMap: Seq[Map[String, Any]] = {
val arr = getCollectLimit(ds.sparkSession) match {
case Some(limit) =>
Expand Down Expand Up @@ -128,4 +172,5 @@ object PlotHelpers {
schema.fields.filter(_.dataType.isInstanceOf[NumericType])
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.coxautodata.vegalite4s.spark.filestore

import java.util.UUID

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

import scala.util.Try


object DatabricksFileStore extends StaticFileStore {

val root: Path = new Path("dbfs:/FileStore")

override def staticStoreExists(sparkSession: SparkSession): Boolean = Try {
root
.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
.exists(root)
}.getOrElse(false)

override def generateOutputPath(): Path = new Path(root, s"vegalite4s/${UUID.randomUUID()}")

override def getStaticURL(arrowFile: Path): String = new Path("files", root.toUri.relativize(arrowFile.toUri).getPath).toString
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.coxautodata.vegalite4s.spark.filestore

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession



trait StaticFileStore {

def staticStoreExists(sparkSession: SparkSession): Boolean

def generateOutputPath(): Path

def getStaticURL(arrowFile: Path): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ package object spark {
val SPARK_DATASET_MAX_RECORDS = "spark.vegalite4s.maxRowsToCollect"
val SPARK_DATASET_MAX_RECORDS_DEFAULT: Long = 10000

val SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE = "spark.vegalite4s.tryStoreAsArrow"
val SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE_DEFAULT: Boolean = true

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.coxautodata.vegalite4s.spark

import java.io.File
import java.nio.file.Files
import java.sql.{Date, Timestamp}

import com.coxautodata.vegalite4s.VegaLite
import com.coxautodata.vegalite4s.spark.PlotHelpers._
import com.coxautodata.vegalite4s.spark.filestore.StaticFileStore
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{FunSpec, Matchers}

Expand Down Expand Up @@ -368,7 +373,7 @@ class PlotHelpersSpec extends FunSpec with Matchers {
import spark.implicits._

val data: DataFrame =
(97 to 122).map(i => TestScatterRecord(i, i*i)).toDF()
(97 to 122).map(i => TestScatterRecord(i, i * i)).toDF()

val plot = VegaLite()
.scatter(data)
Expand Down Expand Up @@ -514,6 +519,45 @@ class PlotHelpersSpec extends FunSpec with Matchers {
|}""".stripMargin)
}


it("withArrowData") {
val temp = Files.createTempDirectory("test_output")
val spark = SparkSession.builder().master("local[1]").getOrCreate()
import spark.implicits._

val testStore = new StaticFileStore {
override def staticStoreExists(sparkSession: SparkSession): Boolean = true

override def generateOutputPath(): Path = new Path(temp.toString, "arrowout")

override def getStaticURL(arrowFile: Path): String = arrowFile.toString
}

val data: DataFrame =
(97 to 122).map(i => TestRecord(i.toChar.toString, i)).toDF()


VegaLite()
.withArrowData(data)(testStore)
.toJson(_.spaces2) should be(
s"""{
| "$$schema" : "https://vega.github.io/schema/vega-lite/v3.json",
| "data" : {
| "url" : "file:${new File(testStore.generateOutputPath().toString).listFiles().map(_.toString).filter(_.endsWith(".arrow")).mkString}",
| "format" : {
| "type" : "arrow"
| }
| }
|}""".stripMargin
)


spark.stop()

FileUtils.deleteDirectory(temp.toFile)

}

}

case class TestRecord(a: String, b: Int)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.coxautodata.vegalite4s.spark.filestore

import org.apache.hadoop.fs.Path
import org.scalatest.{FunSpec, Matchers}

class DatabricksFileStoreSpec extends FunSpec with Matchers {

it("generate a new working directory for each call"){

DatabricksFileStore.generateOutputPath() should not be DatabricksFileStore.generateOutputPath()

}

it("generate a relative url pointing to a databricks filestore"){

DatabricksFileStore.getStaticURL(new Path("dbfs:/FileStore/test/test.arrow")) should be ("files/test/test.arrow")

}

}