diff --git a/build.sbt b/build.sbt index fafc06b..b4893f9 100644 --- a/build.sbt +++ b/build.sbt @@ -9,13 +9,12 @@ import xerial.sbt.Sonatype._ val circeV = "0.11.1" val scalafxV = "8.0.144-R12" val scalatestV = "3.0.4" -val spark2V = "2.4.0" val vegaliteV = "2.6.0" val vegaV = "3.3.1" val vegaembedV = "3.29.1" val scalaloggingV = "3.9.0" val slf4jV = "1.7.16" -val sparkV = "2.4.0" +val sparkV = "2.4.3" lazy val global = project .in(file(".")) diff --git a/core/src/main/scala/com/coxautodata/vegalite4s/providers/ClasspathJarResourceProvider.scala b/core/src/main/scala/com/coxautodata/vegalite4s/providers/ClasspathJarResourceProvider.scala index 001f483..8b9f240 100644 --- a/core/src/main/scala/com/coxautodata/vegalite4s/providers/ClasspathJarResourceProvider.scala +++ b/core/src/main/scala/com/coxautodata/vegalite4s/providers/ClasspathJarResourceProvider.scala @@ -24,7 +24,7 @@ import scala.collection.immutable.Stream.cons */ object ClasspathJarResourceProvider { - def apply(): LibraryProvider = { + def apply(additionalLibraryFilenames: String*): LibraryProvider = { val streams = findResourcesByFilename( List("vega.min.js", "vega-lite.min.js", "vega-embed.min.js") ) diff --git a/core/src/main/scala/com/coxautodata/vegalite4s/providers/JsdelivrProvider.scala b/core/src/main/scala/com/coxautodata/vegalite4s/providers/JsdelivrProvider.scala index b651f70..3a5f27b 100644 --- a/core/src/main/scala/com/coxautodata/vegalite4s/providers/JsdelivrProvider.scala +++ b/core/src/main/scala/com/coxautodata/vegalite4s/providers/JsdelivrProvider.scala @@ -4,19 +4,23 @@ package com.coxautodata.vegalite4s.providers * VegaLite Javascript dependency provider that references the Vega * dependency by providing a URL to the Jsdelivr CDN. * - * @param vegaVersion Version of Vega to reference - * @param vegaLiteVersion Version of Vega-lite to reference + * @param vegaVersion Version of Vega to reference + * @param vegaLiteVersion Version of Vega-lite to reference * @param vegaEmbedVersion Version of Vega-embed to reference */ case class JsdelivrProvider(vegaVersion: String, vegaLiteVersion: String, - vegaEmbedVersion: String) extends LibraryProvider { + vegaEmbedVersion: String, + additionalLibraries: Seq[(String, String)] = Seq.empty + ) extends LibraryProvider { - override def getJavascriptLibraryURLs: Seq[String] = + override def getJavascriptLibraryURLs: Seq[String] = { Seq( - s"https://cdn.jsdelivr.net/npm/vega@$vegaVersion", - s"https://cdn.jsdelivr.net/npm/vega-lite@$vegaLiteVersion", - s"https://cdn.jsdelivr.net/npm/vega-embed@$vegaEmbedVersion" - ) + "vega" -> vegaVersion, + "vega-lite" -> vegaLiteVersion, + "vega-embed" -> vegaEmbedVersion + ) ++ additionalLibraries + }.map { case (k, v) => s"https://cdn.jsdelivr.net/npm/$k@$v" } + } diff --git a/core/src/main/scala/com/coxautodata/vegalite4s/providers/LocalFileProvider.scala b/core/src/main/scala/com/coxautodata/vegalite4s/providers/LocalFileProvider.scala index a06f4a1..43e3b6c 100644 --- a/core/src/main/scala/com/coxautodata/vegalite4s/providers/LocalFileProvider.scala +++ b/core/src/main/scala/com/coxautodata/vegalite4s/providers/LocalFileProvider.scala @@ -12,11 +12,11 @@ object LocalFileProvider { def apply(vegaLiteSchemaVersion: String, vegaPath: String, vegaLitePath: String, - vegaEmbedPath: String): LibraryProvider = + vegaEmbedPath: String, + additionalPaths: String*): LibraryProvider = new InputStreamProvider( - new FileInputStream(vegaPath), - new FileInputStream(vegaLitePath), - new FileInputStream(vegaEmbedPath) + (List(vegaPath, vegaLitePath, vegaEmbedPath) ++ additionalPaths) + .map(new FileInputStream(_)): _* ) } diff --git a/core/src/main/scala/com/coxautodata/vegalite4s/providers/package.scala b/core/src/main/scala/com/coxautodata/vegalite4s/providers/package.scala index 5f9058e..3d0990e 100644 --- a/core/src/main/scala/com/coxautodata/vegalite4s/providers/package.scala +++ b/core/src/main/scala/com/coxautodata/vegalite4s/providers/package.scala @@ -8,7 +8,7 @@ package object providers { * A Vega dependency provider that references the latest available VegaLite library * available on the Jsdelivr CDN */ - object VegaLiteLatestCDNVersion extends VegaLiteProvider(JsdelivrProvider("5", "3", "4"), "3") + object VegaLiteLatestCDNVersion extends VegaLiteProvider(JsdelivrProvider("5", "3", "4", Seq("apache-arrow" -> "0.14.0", "vega-loader-arrow" -> "0.0.7")), "3") /** * A Vega dependency provider that references the latest available Vega library diff --git a/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriter.scala b/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriter.scala new file mode 100644 index 0000000..7369347 --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriter.scala @@ -0,0 +1,47 @@ +package com.coxautodata.arrow + +import java.nio.channels.Channels + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.dictionary.DictionaryProvider +import org.apache.arrow.vector.ipc.ArrowFileWriter +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter} +import org.apache.spark.sql.execution.datasources.OutputWriter +import org.apache.spark.sql.types.StructType + +class ArrowOutputWriter(path: String, context: TaskAttemptContext, schema: StructType, maxRowsPerBatch: Long, timeZoneId: String) extends OutputWriter { + + + private val outputPath = new Path(path) + private val outputStream = outputPath.getFileSystem(context.getConfiguration).create(outputPath) + private val outputChannel = Channels.newChannel(outputStream) + private val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + private val vectorSchema = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator) + private val rootWriter = ArrowWriter.create(vectorSchema) + private val dictionaryProvider = new DictionaryProvider.MapDictionaryProvider() + private val fileWriter = new ArrowFileWriter(vectorSchema, dictionaryProvider, outputChannel) + private var count: Long = 0L + + //https://github.com/animeshtrivedi/blog/blob/master/post/2017-12-26-arrow.md + + private def commitBatch(): Unit = { + rootWriter.finish() + fileWriter.writeBatch() + rootWriter.reset() + count = 0 + } + + override def write(row: InternalRow): Unit = { + rootWriter.write(row) + count += 1 + if (count >= maxRowsPerBatch) commitBatch() + } + + override def close(): Unit = { + if (count != 0) commitBatch() + outputChannel.close() + } +} diff --git a/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriterFactor.scala b/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriterFactor.scala new file mode 100644 index 0000000..20011de --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/arrow/ArrowOutputWriterFactor.scala @@ -0,0 +1,13 @@ +package com.coxautodata.arrow + +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.types.StructType + +class ArrowOutputWriterFactor(maxRowsPerBatch: Long, timeZoneId: String) extends OutputWriterFactory { + override def getFileExtension(context: TaskAttemptContext): String = ".arrow" + + override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): ArrowOutputWriter = { + new ArrowOutputWriter(path, context, dataSchema, maxRowsPerBatch, timeZoneId) + } +} diff --git a/spark2/src/main/scala/com/coxautodata/arrow/DefaultSource.scala b/spark2/src/main/scala/com/coxautodata/arrow/DefaultSource.scala new file mode 100644 index 0000000..7724dae --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/arrow/DefaultSource.scala @@ -0,0 +1,22 @@ +package com.coxautodata.arrow + +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory} +import org.apache.spark.sql.types.StructType + +private[arrow] class DefaultSource extends FileFormat { + override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = + throw new UnsupportedOperationException(s"Read not yet supported") + + override def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { + + //https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#timestamp-with-time-zone-semantics + val maxRecordsPerBatch = options.getOrElse("maxRecordsPerBatch", "10000").toInt + val timeZoneId = options.getOrElse("timeZoneId", "UTC") + new ArrowOutputWriterFactor(maxRecordsPerBatch, timeZoneId) + + } + +} diff --git a/spark2/src/main/scala/com/coxautodata/arrow/package.scala b/spark2/src/main/scala/com/coxautodata/arrow/package.scala new file mode 100644 index 0000000..7277736 --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/arrow/package.scala @@ -0,0 +1,11 @@ +package com.coxautodata + +import org.apache.spark.sql.DataFrameWriter + +package object arrow { + + implicit class ArrowDataFrameWriter[T](writer: DataFrameWriter[T]) { + def arrow: String => Unit = writer.format("com.coxautodata.arrow").save + } + +} diff --git a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/PlotHelpers.scala b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/PlotHelpers.scala index e35d138..c9ab728 100644 --- a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/PlotHelpers.scala +++ b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/PlotHelpers.scala @@ -1,7 +1,11 @@ package com.coxautodata.vegalite4s.spark +import com.coxautodata.arrow._ import com.coxautodata.vegalite4s.PlotHelpers._ +import com.coxautodata.vegalite4s.spark.filestore.{DatabricksFileStore, StaticFileStore} import com.coxautodata.vegalite4s.{SpecConstruct, VegaLite} +import io.circe.{Json, JsonObject} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.types.{NumericType, StructField, StructType} import org.apache.spark.sql.{Dataset, SparkSession} @@ -96,6 +100,46 @@ object PlotHelpers { * @param ds Dataset to add */ def withData(ds: Dataset[_]): T = { + val tryArrow = ds.sparkSession.conf.getOption(SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE).map(_.toBoolean).getOrElse(SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE_DEFAULT) + val candidates = if (tryArrow) List(DatabricksFileStore) else List.empty + candidates + .find(_.staticStoreExists(ds.sparkSession)) + .map(withArrowData(ds)) + .getOrElse(withEmbeddedData(ds)) + } + + private [spark] def withArrowData(ds: Dataset[_])(staticStore: StaticFileStore): T = { + + lazy val url = { + val outputPath = staticStore.generateOutputPath() + + ds.coalesce(1).write.arrow(outputPath.toString) + + val arrowFiles = outputPath + .getFileSystem(ds.sparkSession.sparkContext.hadoopConfiguration) + .globStatus(new Path(outputPath, "*.arrow")) + .collect { case f if f.isFile => f.getPath } + + arrowFiles.toList match { + case List(one) => staticStore.getStaticURL(one) + case Nil => throw new RuntimeException(s"No arrow files found in [$outputPath]") + case _ => throw new RuntimeException(s"Multiple arrow files found in [$outputPath]") + } + } + + spec.withField( + "data", + Json.fromJsonObject( + JsonObject( + "url" -> Json.fromString(url), + "format" -> Json.fromJsonObject(JsonObject("type" -> Json.fromString("arrow"))) + ) + ) + ) + + } + + private def withEmbeddedData(ds: Dataset[_]): T = { def toSeqMap: Seq[Map[String, Any]] = { val arr = getCollectLimit(ds.sparkSession) match { case Some(limit) => @@ -128,4 +172,5 @@ object PlotHelpers { schema.fields.filter(_.dataType.isInstanceOf[NumericType]) } + } diff --git a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStore.scala b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStore.scala new file mode 100644 index 0000000..aec4b27 --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStore.scala @@ -0,0 +1,24 @@ +package com.coxautodata.vegalite4s.spark.filestore + +import java.util.UUID + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession + +import scala.util.Try + + +object DatabricksFileStore extends StaticFileStore { + + val root: Path = new Path("dbfs:/FileStore") + + override def staticStoreExists(sparkSession: SparkSession): Boolean = Try { + root + .getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + .exists(root) + }.getOrElse(false) + + override def generateOutputPath(): Path = new Path(root, s"vegalite4s/${UUID.randomUUID()}") + + override def getStaticURL(arrowFile: Path): String = new Path("files", root.toUri.relativize(arrowFile.toUri).getPath).toString +} diff --git a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/StaticFileStore.scala b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/StaticFileStore.scala new file mode 100644 index 0000000..45ab601 --- /dev/null +++ b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/filestore/StaticFileStore.scala @@ -0,0 +1,15 @@ +package com.coxautodata.vegalite4s.spark.filestore + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession + + + +trait StaticFileStore { + + def staticStoreExists(sparkSession: SparkSession): Boolean + + def generateOutputPath(): Path + + def getStaticURL(arrowFile: Path): String +} diff --git a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/package.scala b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/package.scala index 23a16c8..d587fe3 100644 --- a/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/package.scala +++ b/spark2/src/main/scala/com/coxautodata/vegalite4s/spark/package.scala @@ -8,4 +8,7 @@ package object spark { val SPARK_DATASET_MAX_RECORDS = "spark.vegalite4s.maxRowsToCollect" val SPARK_DATASET_MAX_RECORDS_DEFAULT: Long = 10000 + val SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE = "spark.vegalite4s.tryStoreAsArrow" + val SPARK_ATTEMPT_TO_STORE_AS_STATIC_ARROW_FILE_DEFAULT: Boolean = true + } diff --git a/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/PlotHelpersSpec.scala b/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/PlotHelpersSpec.scala index e546600..8e1d9e7 100644 --- a/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/PlotHelpersSpec.scala +++ b/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/PlotHelpersSpec.scala @@ -1,9 +1,14 @@ package com.coxautodata.vegalite4s.spark +import java.io.File +import java.nio.file.Files import java.sql.{Date, Timestamp} import com.coxautodata.vegalite4s.VegaLite import com.coxautodata.vegalite4s.spark.PlotHelpers._ +import com.coxautodata.vegalite4s.spark.filestore.StaticFileStore +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.{FunSpec, Matchers} @@ -368,7 +373,7 @@ class PlotHelpersSpec extends FunSpec with Matchers { import spark.implicits._ val data: DataFrame = - (97 to 122).map(i => TestScatterRecord(i, i*i)).toDF() + (97 to 122).map(i => TestScatterRecord(i, i * i)).toDF() val plot = VegaLite() .scatter(data) @@ -514,6 +519,45 @@ class PlotHelpersSpec extends FunSpec with Matchers { |}""".stripMargin) } + + it("withArrowData") { + val temp = Files.createTempDirectory("test_output") + val spark = SparkSession.builder().master("local[1]").getOrCreate() + import spark.implicits._ + + val testStore = new StaticFileStore { + override def staticStoreExists(sparkSession: SparkSession): Boolean = true + + override def generateOutputPath(): Path = new Path(temp.toString, "arrowout") + + override def getStaticURL(arrowFile: Path): String = arrowFile.toString + } + + val data: DataFrame = + (97 to 122).map(i => TestRecord(i.toChar.toString, i)).toDF() + + + VegaLite() + .withArrowData(data)(testStore) + .toJson(_.spaces2) should be( + s"""{ + | "$$schema" : "https://vega.github.io/schema/vega-lite/v3.json", + | "data" : { + | "url" : "file:${new File(testStore.generateOutputPath().toString).listFiles().map(_.toString).filter(_.endsWith(".arrow")).mkString}", + | "format" : { + | "type" : "arrow" + | } + | } + |}""".stripMargin + ) + + + spark.stop() + + FileUtils.deleteDirectory(temp.toFile) + + } + } case class TestRecord(a: String, b: Int) diff --git a/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStoreSpec.scala b/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStoreSpec.scala new file mode 100644 index 0000000..29749f6 --- /dev/null +++ b/spark2/src/test/scala/com/coxautodata/vegalite4s/spark/filestore/DatabricksFileStoreSpec.scala @@ -0,0 +1,20 @@ +package com.coxautodata.vegalite4s.spark.filestore + +import org.apache.hadoop.fs.Path +import org.scalatest.{FunSpec, Matchers} + +class DatabricksFileStoreSpec extends FunSpec with Matchers { + + it("generate a new working directory for each call"){ + + DatabricksFileStore.generateOutputPath() should not be DatabricksFileStore.generateOutputPath() + + } + + it("generate a relative url pointing to a databricks filestore"){ + + DatabricksFileStore.getStaticURL(new Path("dbfs:/FileStore/test/test.arrow")) should be ("files/test/test.arrow") + + } + +}