diff --git a/build.sbt b/build.sbt index a7a9546..3f721f3 100644 --- a/build.sbt +++ b/build.sbt @@ -3,6 +3,13 @@ import java.time.Year val scalaVersions = Seq("2.12.15") +val sparkVersion = "3.1.3" +val catsVersion = "2.6.1" +val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x +val scalaTestVersion = "3.2.11" +val geomesaVersion = "3.3.0" +val geotrellisVersion = "3.6.1+0-6b5868af+20220321-1909-SNAPSHOT" //"3.6.1" + lazy val commonSettings = Seq( scalaVersion := scalaVersions.head, crossScalaVersions := scalaVersions, @@ -52,7 +59,7 @@ lazy val root = (project in file(".")) publish := {}, publishLocal := {} ) - .aggregate(core, spatial) + .aggregate(core, spatial, `spatial-index`) lazy val core = project .settings(commonSettings) @@ -60,10 +67,10 @@ lazy val core = project .settings( addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full), libraryDependencies ++= Seq( - "org.typelevel" %% "cats-core" % "2.6.1", - "com.chuusai" %% "shapeless" % "2.3.3", // to be compatible with Spark 3.1.x - "org.apache.spark" %% "spark-hive" % "3.1.2" % Provided, - "org.scalatest" %% "scalatest" % "3.2.11" % Test + "org.typelevel" %% "cats-core" % catsVersion, + "com.chuusai" %% "shapeless" % shapelessVersion, + "org.apache.spark" %% "spark-hive" % sparkVersion % Provided, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test ) ) @@ -73,8 +80,19 @@ lazy val spatial = project .settings(name := "hiveless-spatial") .settings( libraryDependencies ++= Seq( - "org.locationtech.geomesa" %% "geomesa-spark-jts" % "3.3.0", - "org.scalatest" %% "scalatest" % "3.2.10" % Test + "org.locationtech.geomesa" %% "geomesa-spark-jts" % geomesaVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test + ) + ) + +lazy val `spatial-index` = project + .dependsOn(spatial % "compile->compile;provided->provided") + .settings(commonSettings) + .settings(name := "hiveless-spatial-index") + .settings( + libraryDependencies ++= Seq( + "org.locationtech.geotrellis" %% "geotrellis-store" % geotrellisVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test ), assembly / test := {}, assembly / assemblyShadeRules := { diff --git a/core/src/main/scala/com/azavea/hiveless/implicits/syntax.scala b/core/src/main/scala/com/azavea/hiveless/implicits/syntax.scala index af3156d..8d86092 100644 --- a/core/src/main/scala/com/azavea/hiveless/implicits/syntax.scala +++ b/core/src/main/scala/com/azavea/hiveless/implicits/syntax.scala @@ -16,10 +16,11 @@ package com.azavea.hiveless.implicits -import com.azavea.hiveless.serializers.HDeserialier +import com.azavea.hiveless.serializers.{HConverter, HDeserialier, HSerializer, UnaryDeserializer} import org.apache.hadoop.hive.ql.udf.generic.GenericUDF +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector -object syntax { +object syntax extends Serializable { implicit class DeferredObjectOps(val self: GenericUDF.DeferredObject) extends AnyVal { /** Behaves like a regular get, but throws when the result is null. */ @@ -28,4 +29,17 @@ object syntax { case _ => throw HDeserialier.Errors.NullArgument } } + + implicit class ArrayDeferredObjectOps(val self: Array[GenericUDF.DeferredObject]) extends AnyVal { + def deserialize[F[_], T: UnaryDeserializer[F, *]](inspectors: Array[ObjectInspector]): F[T] = + UnaryDeserializer[F, T].deserialize(self, inspectors) + } + + implicit class ConverterOps(val self: Any) extends AnyVal { + def convert[T: HConverter]: T = HConverter[T].convert(self) + } + + implicit class SerializerOps[T](val self: T) extends AnyVal { + def serialize(implicit ev: HSerializer[T]): Any = HSerializer[T].serialize(self) + } } diff --git a/core/src/main/scala/com/azavea/hiveless/serializers/GenericDeserializer.scala b/core/src/main/scala/com/azavea/hiveless/serializers/GenericDeserializer.scala index c713703..d451d7e 100644 --- a/core/src/main/scala/com/azavea/hiveless/serializers/GenericDeserializer.scala +++ b/core/src/main/scala/com/azavea/hiveless/serializers/GenericDeserializer.scala @@ -70,7 +70,10 @@ object GenericDeserializer extends Serializable { dh: UnaryDeserializer[F, H], dt: GenericDeserializer[F, T] ): GenericDeserializer[F, H :: T] = new GenericDeserializer[F, H :: T] { - def deserialize(arguments: Array[GenericUDF.DeferredObject], inspectors: Array[ObjectInspector]): F[H :: T] = - (dh.deserialize(arguments.head, inspectors.head), dt.deserialize(arguments.tail, inspectors.tail)).mapN(_ :: _) + def deserialize(arguments: Array[GenericUDF.DeferredObject], inspectors: Array[ObjectInspector]): F[H :: T] = { + // take and drop allow us to handle options safely + // take is left for semantics reasons only + (dh.deserialize(arguments.take(1), inspectors.take(1)), dt.deserialize(arguments.drop(1), inspectors.drop(1))).mapN(_ :: _) + } } } diff --git a/core/src/main/scala/com/azavea/hiveless/serializers/UnaryDeserializer.scala b/core/src/main/scala/com/azavea/hiveless/serializers/UnaryDeserializer.scala index c8d312e..bb2abff 100644 --- a/core/src/main/scala/com/azavea/hiveless/serializers/UnaryDeserializer.scala +++ b/core/src/main/scala/com/azavea/hiveless/serializers/UnaryDeserializer.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.hive.HivelessInternals.unwrap import org.apache.spark.sql.types.Decimal import org.apache.spark.unsafe.types.UTF8String import cats.Id +import cats.syntax.apply._ import org.apache.spark.sql.catalyst.util.ArrayData import shapeless.HNil @@ -45,6 +46,10 @@ object UnaryDeserializer extends Serializable { implicit def tryUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Try, T] = (arguments, inspectors) => Try(UnaryDeserializer[Id, T].deserialize(arguments, inspectors)) + /** Derive Optional UnaryDeserializers. */ + implicit def optionalUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Id, Option[T]] = + (arguments, inspectors) => (arguments.headOption, inspectors.headOption).mapN(UnaryDeserializer[Id, T].deserialize) + /** Derivation helper deserializer. */ implicit val hnilUnaryDeserializer: UnaryDeserializer[Id, HNil] = (_, _) => HNil diff --git a/spatial-index/sql/createUDFs.sql b/spatial-index/sql/createUDFs.sql new file mode 100644 index 0000000..b20541a --- /dev/null +++ b/spatial-index/sql/createUDFs.sql @@ -0,0 +1 @@ +CREATE OR REPLACE FUNCTION ST_partitionCentroid as 'com.azavea.hiveless.spatial.index.ST_PartitionCentroid'; diff --git a/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/ST_PartitionCentroid.scala b/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/ST_PartitionCentroid.scala new file mode 100644 index 0000000..67d6268 --- /dev/null +++ b/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/ST_PartitionCentroid.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2022 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.azavea.hiveless.spatial.index + +import com.azavea.hiveless.HUDF +import com.azavea.hiveless.spatial._ +import geotrellis.layer.{SpatialKey, ZoomedLayoutScheme} +import geotrellis.vector._ +import geotrellis.proj4.{CRS, LatLng} +import geotrellis.store.index.zcurve.Z2 +import org.locationtech.jts.geom.Geometry + +class ST_PartitionCentroid extends HUDF[(Geometry, Int, Option[CRS], Option[Int], Option[Double], Option[Int]), Long] { + val name: String = "st_partitionCentroid" + def function = { + case (geom: Geometry, zoom: Int, crsOpt: Option[CRS], tileSizeOpt: Option[Int], resolutionThresholdOpt: Option[Double], bitsOpt: Option[Int]) => + // set default values + val crs = crsOpt.getOrElse(LatLng) + val tileSize = tileSizeOpt.getOrElse(ZoomedLayoutScheme.DEFAULT_TILE_SIZE) + val resolutionThreshold = resolutionThresholdOpt.getOrElse(ZoomedLayoutScheme.DEFAULT_RESOLUTION_THRESHOLD) + val bits = bitsOpt.getOrElse(8) + + // compute key + val SpatialKey(col, row) = new ZoomedLayoutScheme(crs, tileSize, resolutionThreshold) + .levelForZoom(zoom) + .layout + .mapTransform(geom.extent.center) + + Z2(col, row).z >> bits + } +} diff --git a/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/package.scala b/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/package.scala new file mode 100644 index 0000000..2b6c950 --- /dev/null +++ b/spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/package.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2022 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.azavea.hiveless.spatial + +import com.azavea.hiveless.serializers.{HConverter, HSerializer, UnaryDeserializer} +import com.azavea.hiveless.implicits.syntax._ +import cats.Id +import geotrellis.proj4.CRS +import org.apache.spark.sql.types.{DataType, StringType} + +package object index { + implicit def crsConverter: HConverter[CRS] = new HConverter[CRS] { + def convert(argument: Any): CRS = CRS.fromString(argument.convert[String]) + } + + implicit def crsUnaryDeserializer: UnaryDeserializer[Id, CRS] = + (arguments, inspectors) => arguments.deserialize[Id, String](inspectors).convert[CRS] + + implicit def crsSerializer: HSerializer[CRS] = new HSerializer[CRS] { + def dataType: DataType = StringType + def serialize: CRS => Any = crs => crs.toProj4String.serialize + } +} diff --git a/spatial/src/main/scala/com/azavea/hiveless/spatial/package.scala b/spatial/src/main/scala/com/azavea/hiveless/spatial/package.scala index 6a0298f..4c2a528 100644 --- a/spatial/src/main/scala/com/azavea/hiveless/spatial/package.scala +++ b/spatial/src/main/scala/com/azavea/hiveless/spatial/package.scala @@ -17,22 +17,22 @@ package com.azavea.hiveless import com.azavea.hiveless.serializers.{HConverter, HSerializer, UnaryDeserializer} +import com.azavea.hiveless.implicits.syntax._ import cats.Id import org.locationtech.jts.geom.Geometry -import org.apache.spark.sql.jts.GeometryUDT -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{BinaryType, DataType} +import org.locationtech.geomesa.spark.jts.util.WKBUtils package object spatial extends Serializable { implicit def geometryConverter[T <: Geometry]: HConverter[T] = new HConverter[T] { - def convert(argument: Any): T = GeometryUDT.deserialize(argument).asInstanceOf[T] + def convert(argument: Any): T = WKBUtils.read(argument.asInstanceOf[Array[Byte]]).asInstanceOf[T] } implicit def geometryUnaryDeserializer[T <: Geometry: HConverter]: UnaryDeserializer[Id, T] = - (arguments, inspectors) => HConverter[T].convert(UnaryDeserializer[Id, InternalRow].deserialize(arguments, inspectors)) + (arguments, inspectors) => arguments.deserialize[Id, Array[Byte]](inspectors).convert[T] implicit def geometrySerializer[T <: Geometry]: HSerializer[T] = new HSerializer[T] { - def dataType: DataType = GeometryUDT - def serialize: Geometry => InternalRow = GeometryUDT.serialize + def dataType: DataType = BinaryType + def serialize: Geometry => Array[Byte] = WKBUtils.write } }