Skip to content

Commit

Permalink
[DE-829] Spark 3.5 (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
rashtao authored Jul 4, 2024
1 parent 4ddc137 commit c0df4ab
Show file tree
Hide file tree
Showing 36 changed files with 2,906 additions and 44 deletions.
31 changes: 30 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
topology:
- 'single'
- 'cluster'
Expand All @@ -399,6 +400,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
topology:
- 'single'
- 'cluster'
Expand All @@ -412,6 +414,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
scala-version:
- '2.12'
- '2.13'
Expand All @@ -435,6 +438,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
scala-version:
- '2.12'
- '2.13'
Expand Down Expand Up @@ -484,6 +488,18 @@ workflows:
spark-full-version:
- '3.4.2'
- '3.4.3'
- integration-tests:
name: integration-spark<<matrix.spark-full-version>>-scala<<matrix.scala-version>>
matrix:
parameters:
spark-version:
- '3.5'
scala-version:
- '2.12'
- '2.13'
spark-full-version:
- '3.5.0'
- '3.5.1'
test-python:
when:
not: <<pipeline.parameters.docker-img>>
Expand Down Expand Up @@ -520,6 +536,16 @@ workflows:
- '3.4'
pyspark-version:
- '3.4.3'
- python-integration-tests:
name: test-pyspark-<<matrix.pyspark-version>>
matrix:
parameters:
python-executor:
- 'p312'
spark-version:
- '3.5'
pyspark-version:
- '3.5.1'
demo:
when:
not: <<pipeline.parameters.docker-img>>
Expand All @@ -532,6 +558,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
scala-version:
- '2.12'
- '2.13'
Expand All @@ -544,7 +571,7 @@ workflows:
matrix:
parameters:
spark-version:
- '3.4'
- '3.5'
scala-version:
- '2.12'
deploy:
Expand All @@ -557,6 +584,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
scala-version:
- '2.12'
- '2.13'
Expand All @@ -576,6 +604,7 @@ workflows:
- '3.2'
- '3.3'
- '3.4'
- '3.5'
scala-version:
- '2.12'
- '2.13'
Expand Down
75 changes: 75 additions & 0 deletions arangodb-spark-datasource-3.5/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arangodb-spark-datasource-3.5_${scala.compat.version}</artifactId>

<name>arangodb-spark-datasource-3.5</name>
<description>ArangoDB Datasource for Apache Spark 3.5</description>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>

<developers>
<developer>
<name>Michele Rastelli</name>
<url>https://github.com/rashtao</url>
</developer>
</developers>

<scm>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
</scm>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<sonar.coverage.jacoco.xmlReportPaths>../integration-tests/target/site/jacoco-aggregate/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.coverage.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.coverage.exclusions>
<sonar.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.exclusions>
<scalastyle.skip>false</scalastyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-commons-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<skipNexusStagingDeployMojo>false</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoGeneratorProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoParserProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.arangodb.spark.DefaultSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.arangodb.spark

import org.apache.spark.sql.arangodb.commons.{ArangoClient, ArangoDBConf}
import org.apache.spark.sql.arangodb.datasource.ArangoTable
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util

class DefaultSource extends TableProvider with DataSourceRegister {

private def extractOptions(options: util.Map[String, String]): ArangoDBConf = {
val opts: ArangoDBConf = ArangoDBConf(options)
if (opts.driverOptions.acquireHostList) {
val hosts = ArangoClient.acquireHostList(opts)
opts.updated(ArangoDBConf.ENDPOINTS, hosts.mkString(","))
} else {
opts
}
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema()

private def getTable(options: CaseInsensitiveStringMap): Table =
getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null

override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table =
getTable(Option(schema), properties)

override def supportsExternalMetadata(): Boolean = true

override def shortName(): String = "arangodb"

private def getTable(schema: Option[StructType], properties: util.Map[String, String]) =
new ArangoTable(schema, extractOptions(properties))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.sql.arangodb.datasource

import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ArangoUtils}
import org.apache.spark.sql.arangodb.datasource.reader.ArangoScanBuilder
import org.apache.spark.sql.arangodb.datasource.writer.ArangoWriterBuilder
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.collection.JavaConverters.setAsJavaSetConverter

class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite {
private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options))

override def name(): String = this.getClass.toString

override def schema(): StructType = tableSchema

override def capabilities(): util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
// TableCapability.STREAMING_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA,
TableCapability.TRUNCATE
// TableCapability.OVERWRITE_BY_FILTER,
// TableCapability.OVERWRITE_DYNAMIC,
).asJava

override def newScanBuilder(scanOptions: CaseInsensitiveStringMap): ScanBuilder =
new ArangoScanBuilder(options.updated(ArangoDBConf(scanOptions)), schema())

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new ArangoWriterBuilder(info.schema(), options.updated(ArangoDBConf(info.options())))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.JsonFactoryBuilder
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
import org.apache.spark.sql.types.{DataType, StructType}

import java.io.OutputStream

abstract sealed class ArangoGeneratorImpl(
schema: DataType,
writer: OutputStream,
options: JSONOptions)
extends JacksonGenerator(
schema,
options.buildJsonFactory().createGenerator(writer),
options) with ArangoGenerator

class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
override def of(
contentType: ContentType,
schema: StructType,
outputStream: OutputStream,
conf: ArangoDBConf
): ArangoGeneratorImpl = contentType match {
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new JsonFactoryBuilder().build(), conf)
)

class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new VPackFactoryBuilder().build(), conf)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.json.JsonReadFeature
import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder}
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoParser, ArangoParserProvider, MappingUtils}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonParser}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

abstract sealed class ArangoParserImpl(
schema: DataType,
options: JSONOptions,
recordLiteral: Array[Byte] => UTF8String)
extends JacksonParser(schema, options) with ArangoParser {
override def parse(data: Array[Byte]): Iterable[InternalRow] = super.parse(
data,
(jsonFactory: JsonFactory, record: Array[Byte]) => jsonFactory.createParser(record),
recordLiteral
)
}

class ArangoParserProviderImpl extends ArangoParserProvider {
override def of(contentType: ContentType, schema: DataType, conf: ArangoDBConf): ArangoParserImpl = contentType match {
case ContentType.JSON => new JsonArangoParser(schema, conf)
case ContentType.VPACK => new VPackArangoParser(schema, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoParser(schema: DataType, conf: ArangoDBConf)
extends ArangoParserImpl(
schema,
createOptions(new JsonFactoryBuilder()
.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS, true)
.build(), conf),
(bytes: Array[Byte]) => UTF8String.fromBytes(bytes)
)

class VPackArangoParser(schema: DataType, conf: ArangoDBConf)
extends ArangoParserImpl(
schema,
createOptions(new VPackFactoryBuilder().build(), conf),
(bytes: Array[Byte]) => UTF8String.fromString(MappingUtils.vpackToJson(bytes))
)
Loading

0 comments on commit c0df4ab

Please sign in to comment.