Skip to content

Commit

Permalink
[DE-598] Spark 3.4 (#45)
Browse files Browse the repository at this point in the history
* Spark 3.4

* test fixes

* upd sonar spark-version

* test fixes
  • Loading branch information
rashtao authored May 31, 2023
1 parent 1488e52 commit f0ee986
Show file tree
Hide file tree
Showing 41 changed files with 2,817 additions and 42 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/maven-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ jobs:

strategy:
fail-fast: false
matrix:
include:
- scala-version: 2.11
spark-version: 2.4
- scala-version: 2.12
spark-version: 2.4
- scala-version: 2.12
spark-version: 3.1
- scala-version: 2.12
spark-version: 3.2
- scala-version: 2.13
spark-version: 3.2
- scala-version: 2.12
spark-version: 3.3
- scala-version: 2.13
spark-version: 3.3
- scala-version: 2.12
spark-version: 3.4
- scala-version: 2.13
spark-version: 3.4

steps:
- uses: actions/checkout@v2
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/maven-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ jobs:
spark-version: 3.2
- scala-version: 2.13
spark-version: 3.2
- scala-version: 2.12
spark-version: 3.3
- scala-version: 2.13
spark-version: 3.3
- scala-version: 2.12
spark-version: 3.4
- scala-version: 2.13
spark-version: 3.4

steps:
- uses: actions/checkout@v2
Expand Down
35 changes: 33 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- single
- cluster
Expand All @@ -53,12 +55,18 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.11
java-version: 11
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
spark-version: 3.1
- docker-img: docker.io/arangodb/arangodb:3.9.10
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.10.6
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.11.0
Expand Down Expand Up @@ -96,6 +104,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- cluster
java-version:
Expand All @@ -107,6 +117,10 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
Expand Down Expand Up @@ -140,10 +154,15 @@ jobs:
matrix:
python-version: [3.9]
scala-version: [2.12]
spark-version: [3.1, 3.2]
spark-version: [3.1, 3.2, 3.3, 3.4]
topology: [single, cluster]
java-version: [8, 11]
docker-img: ["docker.io/arangodb/arangodb:3.11.0"]
exclude:
- topology: cluster
java-version: 8
- topology: single
java-version: 11

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -191,6 +210,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- single
java-version:
Expand All @@ -203,6 +224,10 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
Expand Down Expand Up @@ -301,6 +326,12 @@ jobs:
- spark-version: 3.3
scala-version: 2.13
spark-full-version: 3.3.2
- spark-version: 3.4
scala-version: 2.12
spark-full-version: 3.4.0
- spark-version: 3.4
scala-version: 2.13
spark-full-version: 3.4.0

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -331,7 +362,7 @@ jobs:
scala-version:
- 2.12
spark-version:
- 3.2
- 3.4
topology:
- single
java-version:
Expand Down
79 changes: 79 additions & 0 deletions arangodb-spark-datasource-3.4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arangodb-spark-datasource-3.4_${scala.compat.version}</artifactId>

<name>arangodb-spark-datasource-3.4</name>
<description>ArangoDB Datasource for Apache Spark 3.4</description>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>

<developers>
<developer>
<name>Michele Rastelli</name>
<url>https://github.com/rashtao</url>
</developer>
</developers>

<scm>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
</scm>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<sonar.coverage.jacoco.xmlReportPaths>../integration-tests/target/site/jacoco-aggregate/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.coverage.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.coverage.exclusions>
<sonar.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.exclusions>
<scalastyle.skip>false</scalastyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-commons-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<skipNexusStagingDeployMojo>false</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoGeneratorProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoParserProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.arangodb.spark.DefaultSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.arangodb.spark

import org.apache.spark.sql.arangodb.commons.{ArangoClient, ArangoDBConf}
import org.apache.spark.sql.arangodb.datasource.ArangoTable
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util

class DefaultSource extends TableProvider with DataSourceRegister {

private def extractOptions(options: util.Map[String, String]): ArangoDBConf = {
val opts: ArangoDBConf = ArangoDBConf(options)
if (opts.driverOptions.acquireHostList) {
val hosts = ArangoClient.acquireHostList(opts)
opts.updated(ArangoDBConf.ENDPOINTS, hosts.mkString(","))
} else {
opts
}
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema()

private def getTable(options: CaseInsensitiveStringMap): Table =
getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null

override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table =
getTable(Option(schema), properties)

override def supportsExternalMetadata(): Boolean = true

override def shortName(): String = "arangodb"

private def getTable(schema: Option[StructType], properties: util.Map[String, String]) =
new ArangoTable(schema, extractOptions(properties))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.sql.arangodb.datasource

import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ArangoUtils}
import org.apache.spark.sql.arangodb.datasource.reader.ArangoScanBuilder
import org.apache.spark.sql.arangodb.datasource.writer.ArangoWriterBuilder
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.collection.JavaConverters.setAsJavaSetConverter

class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite {
private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options))

override def name(): String = this.getClass.toString

override def schema(): StructType = tableSchema

override def capabilities(): util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
// TableCapability.STREAMING_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA,
TableCapability.TRUNCATE
// TableCapability.OVERWRITE_BY_FILTER,
// TableCapability.OVERWRITE_DYNAMIC,
).asJava

override def newScanBuilder(scanOptions: CaseInsensitiveStringMap): ScanBuilder =
new ArangoScanBuilder(options.updated(ArangoDBConf(scanOptions)), schema())

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new ArangoWriterBuilder(info.schema(), options.updated(ArangoDBConf(info.options())))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.JsonFactoryBuilder
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
import org.apache.spark.sql.types.{DataType, StructType}

import java.io.OutputStream

abstract sealed class ArangoGeneratorImpl(
schema: DataType,
writer: OutputStream,
options: JSONOptions)
extends JacksonGenerator(
schema,
options.buildJsonFactory().createGenerator(writer),
options) with ArangoGenerator

class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
override def of(
contentType: ContentType,
schema: StructType,
outputStream: OutputStream,
conf: ArangoDBConf
): ArangoGeneratorImpl = contentType match {
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new JsonFactoryBuilder().build(), conf)
)

class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new VPackFactoryBuilder().build(), conf)
)
Loading

0 comments on commit f0ee986

Please sign in to comment.