Skip to content

Commit

Permalink
Add spatial-index subproject
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Mar 22, 2022
1 parent 978febc commit fab56b6
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 18 deletions.
32 changes: 25 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ import java.time.Year

val scalaVersions = Seq("2.12.15")

val sparkVersion = "3.1.3"
val catsVersion = "2.6.1"
val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x
val scalaTestVersion = "3.2.11"
val geomesaVersion = "3.3.0"
val geotrellisVersion = "3.6.1+0-6b5868af+20220321-1909-SNAPSHOT" //"3.6.1"

lazy val commonSettings = Seq(
scalaVersion := scalaVersions.head,
crossScalaVersions := scalaVersions,
Expand Down Expand Up @@ -52,18 +59,18 @@ lazy val root = (project in file("."))
publish := {},
publishLocal := {}
)
.aggregate(core, spatial)
.aggregate(core, spatial, `spatial-index`)

lazy val core = project
.settings(commonSettings)
.settings(name := "hiveless-core")
.settings(
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full),
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % "2.6.1",
"com.chuusai" %% "shapeless" % "2.3.3", // to be compatible with Spark 3.1.x
"org.apache.spark" %% "spark-hive" % "3.1.2" % Provided,
"org.scalatest" %% "scalatest" % "3.2.11" % Test
"org.typelevel" %% "cats-core" % catsVersion,
"com.chuusai" %% "shapeless" % shapelessVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion % Provided,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
)

Expand All @@ -73,8 +80,19 @@ lazy val spatial = project
.settings(name := "hiveless-spatial")
.settings(
libraryDependencies ++= Seq(
"org.locationtech.geomesa" %% "geomesa-spark-jts" % "3.3.0",
"org.scalatest" %% "scalatest" % "3.2.10" % Test
"org.locationtech.geomesa" %% "geomesa-spark-jts" % geomesaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
)

lazy val `spatial-index` = project
.dependsOn(spatial % "compile->compile;provided->provided")
.settings(commonSettings)
.settings(name := "hiveless-spatial-index")
.settings(
libraryDependencies ++= Seq(
"org.locationtech.geotrellis" %% "geotrellis-store" % geotrellisVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
),
assembly / test := {},
assembly / assemblyShadeRules := {
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/scala/com/azavea/hiveless/implicits/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package com.azavea.hiveless.implicits

import com.azavea.hiveless.serializers.HDeserialier
import com.azavea.hiveless.serializers.{HConverter, HDeserialier, HSerializer, UnaryDeserializer}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector

object syntax {
object syntax extends Serializable {
implicit class DeferredObjectOps(val self: GenericUDF.DeferredObject) extends AnyVal {

/** Behaves like a regular get, but throws when the result is null. */
Expand All @@ -28,4 +29,17 @@ object syntax {
case _ => throw HDeserialier.Errors.NullArgument
}
}

implicit class ArrayDeferredObjectOps(val self: Array[GenericUDF.DeferredObject]) extends AnyVal {
def deserialize[F[_], T: UnaryDeserializer[F, *]](inspectors: Array[ObjectInspector]): F[T] =
UnaryDeserializer[F, T].deserialize(self, inspectors)
}

implicit class ConverterOps(val self: Any) extends AnyVal {
def convert[T: HConverter]: T = HConverter[T].convert(self)
}

implicit class SerializerOps[T](val self: T) extends AnyVal {
def serialize(implicit ev: HSerializer[T]): Any = HSerializer[T].serialize(self)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ object GenericDeserializer extends Serializable {
dh: UnaryDeserializer[F, H],
dt: GenericDeserializer[F, T]
): GenericDeserializer[F, H :: T] = new GenericDeserializer[F, H :: T] {
def deserialize(arguments: Array[GenericUDF.DeferredObject], inspectors: Array[ObjectInspector]): F[H :: T] =
(dh.deserialize(arguments.head, inspectors.head), dt.deserialize(arguments.tail, inspectors.tail)).mapN(_ :: _)
def deserialize(arguments: Array[GenericUDF.DeferredObject], inspectors: Array[ObjectInspector]): F[H :: T] = {
// take and drop allow us to handle options safely
// take is left for semantics reasons only
(dh.deserialize(arguments.take(1), inspectors.take(1)), dt.deserialize(arguments.drop(1), inspectors.drop(1))).mapN(_ :: _)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.hive.HivelessInternals.unwrap
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String
import cats.Id
import cats.syntax.apply._
import org.apache.spark.sql.catalyst.util.ArrayData
import shapeless.HNil

Expand All @@ -45,6 +46,10 @@ object UnaryDeserializer extends Serializable {
implicit def tryUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Try, T] =
(arguments, inspectors) => Try(UnaryDeserializer[Id, T].deserialize(arguments, inspectors))

/** Derive Optional UnaryDeserializers. */
implicit def optionalUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Id, Option[T]] =
(arguments, inspectors) => (arguments.headOption, inspectors.headOption).mapN(UnaryDeserializer[Id, T].deserialize)

/** Derivation helper deserializer. */
implicit val hnilUnaryDeserializer: UnaryDeserializer[Id, HNil] = (_, _) => HNil

Expand Down
1 change: 1 addition & 0 deletions spatial-index/sql/createUDFs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE OR REPLACE FUNCTION ST_partitionCentroid as 'com.azavea.hiveless.spatial.index.ST_PartitionCentroid';
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spatial.index

import com.azavea.hiveless.HUDF
import com.azavea.hiveless.spatial._
import geotrellis.layer.{SpatialKey, ZoomedLayoutScheme}
import geotrellis.vector._
import geotrellis.proj4.{CRS, LatLng}
import geotrellis.store.index.zcurve.Z2
import org.locationtech.jts.geom.Geometry

class ST_PartitionCentroid extends HUDF[(Geometry, Int, Option[CRS], Option[Int], Option[Double], Option[Int]), Long] {
val name: String = "st_partitionCentroid"
def function = {
case (geom: Geometry, zoom: Int, crsOpt: Option[CRS], tileSizeOpt: Option[Int], resolutionThresholdOpt: Option[Double], bitsOpt: Option[Int]) =>
// set default values
val crs = crsOpt.getOrElse(LatLng)
val tileSize = tileSizeOpt.getOrElse(ZoomedLayoutScheme.DEFAULT_TILE_SIZE)
val resolutionThreshold = resolutionThresholdOpt.getOrElse(ZoomedLayoutScheme.DEFAULT_RESOLUTION_THRESHOLD)
val bits = bitsOpt.getOrElse(8)

// compute key
val SpatialKey(col, row) = new ZoomedLayoutScheme(crs, tileSize, resolutionThreshold)
.levelForZoom(zoom)
.layout
.mapTransform(geom.extent.center)

Z2(col, row).z >> bits
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spatial

import com.azavea.hiveless.serializers.{HConverter, HSerializer, UnaryDeserializer}
import com.azavea.hiveless.implicits.syntax._
import cats.Id
import geotrellis.proj4.CRS
import org.apache.spark.sql.types.{DataType, StringType}

package object index {
implicit def crsConverter: HConverter[CRS] = new HConverter[CRS] {
def convert(argument: Any): CRS = CRS.fromString(argument.convert[String])
}

implicit def crsUnaryDeserializer: UnaryDeserializer[Id, CRS] =
(arguments, inspectors) => arguments.deserialize[Id, String](inspectors).convert[CRS]

implicit def crsSerializer: HSerializer[CRS] = new HSerializer[CRS] {
def dataType: DataType = StringType
def serialize: CRS => Any = crs => crs.toProj4String.serialize
}
}
14 changes: 7 additions & 7 deletions spatial/src/main/scala/com/azavea/hiveless/spatial/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
package com.azavea.hiveless

import com.azavea.hiveless.serializers.{HConverter, HSerializer, UnaryDeserializer}
import com.azavea.hiveless.implicits.syntax._
import cats.Id
import org.locationtech.jts.geom.Geometry
import org.apache.spark.sql.jts.GeometryUDT
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.locationtech.geomesa.spark.jts.util.WKBUtils

package object spatial extends Serializable {
implicit def geometryConverter[T <: Geometry]: HConverter[T] = new HConverter[T] {
def convert(argument: Any): T = GeometryUDT.deserialize(argument).asInstanceOf[T]
def convert(argument: Any): T = WKBUtils.read(argument.asInstanceOf[Array[Byte]]).asInstanceOf[T]
}

implicit def geometryUnaryDeserializer[T <: Geometry: HConverter]: UnaryDeserializer[Id, T] =
(arguments, inspectors) => HConverter[T].convert(UnaryDeserializer[Id, InternalRow].deserialize(arguments, inspectors))
(arguments, inspectors) => arguments.deserialize[Id, Array[Byte]](inspectors).convert[T]

implicit def geometrySerializer[T <: Geometry]: HSerializer[T] = new HSerializer[T] {
def dataType: DataType = GeometryUDT
def serialize: Geometry => InternalRow = GeometryUDT.serialize
def dataType: DataType = BinaryType
def serialize: Geometry => Array[Byte] = WKBUtils.write
}
}

0 comments on commit fab56b6

Please sign in to comment.