Skip to content

Commit

Permalink
FSDS - Support path filtering for queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Forrest Feaser committed Dec 12, 2024
1 parent f8b6dc7 commit 3c77df2
Show file tree
Hide file tree
Showing 23 changed files with 327 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Bounds {
object Bound {
private val unboundedBound = Bound[Any](None, inclusive = false)
def unbounded[T]: Bound[T] = unboundedBound.asInstanceOf[Bound[T]]
def inclusive[T](value: T): Bound[T] = Bound(Option(value), inclusive = true)
}

private val allValues = Bounds(Bound.unbounded, Bound.unbounded)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.api

import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter

trait PathFilterFactory {
def apply(filter: Filter): PathFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.api

import java.util.ServiceLoader

/**
* Factory for loading path filters
*/
trait PathFilterFactoryFactory {

/**
* Load a path filter factory
*
* @param config factory config options
* @return
*/
def load(config: NamedOptions): Option[PathFilterFactory]
}

object PathFilterFactoryFactory {

import scala.collection.JavaConverters._

private lazy val factories = ServiceLoader.load(classOf[PathFilterFactoryFactory]).asScala.toSeq

/**
* Create a path filter factory instance via SPI lookup
*
* @param config factory config options
* @return
*/
def load(config: NamedOptions): Option[PathFilterFactory] = factories.toStream.flatMap(_.load(config)).headOption
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ trait StorageMetadata extends Compactable with Closeable {
*/
def leafStorage: Boolean

/**
* The path filter factory used to filter paths during queries
*
* @return path filter factory
*/
def pathFilterFactory: Option[PathFilterFactory]

/**
* Get a previously set key-value pair
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory$DtgPathFilterFactoryFactory
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ abstract class AbstractFileSystemStorage(
fp.partitions.mkString(", "))
reader
}
val pathFilter = metadata.pathFilterFactory.map(_.apply(fp.filter))
// each partition must be read separately, to ensure modifications are handled correctly
fp.partitions.iterator.flatMap { p =>
val files = getFilePaths(p)
fp.partitions.iterator.flatMap { path =>
val files = getFilePaths(path).filter(p => pathFilter.forall(_.accept(p.path)))
if (files.isEmpty) { Iterator.empty } else { Iterator.single(reader -> files) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class FileBasedMetadata(
override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme)
override val encoding: String = meta.config(Metadata.Encoding)
override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean
override val pathFilterFactory: Option[PathFilterFactory] = None

private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.dbcp2.{PoolableConnection, PoolingDataSource}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionMetadata, StorageFile, StorageFileAction}
import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, StorageMetadata}
import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, PathFilterFactory, StorageMetadata}
import org.locationtech.geomesa.fs.storage.common.metadata.JdbcMetadata.MetadataTable
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.text.StringSerialization
Expand Down Expand Up @@ -84,6 +84,7 @@ class JdbcMetadata(
override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme)
override val encoding: String = meta.config(Metadata.Encoding)
override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean
override val pathFilterFactory: Option[PathFilterFactory] = None

private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.pathfilters

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.Bounds.Bound
import org.locationtech.geomesa.filter.{Bounds, FilterHelper}
import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory}

import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}
import java.util.regex.Pattern
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration)
extends PathFilterFactory with LazyLogging {

def apply(filter: Filter): PathFilter = {
val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true)
path => try {
val time = parseDtg(path.getName).toInstant
val millis = buffer.toMillis
val lower = ZonedDateTime.ofInstant(time.minusMillis(millis), ZoneOffset.UTC)
val upper = ZonedDateTime.ofInstant(time.plusMillis(millis), ZoneOffset.UTC)
val buffered = Bounds(Bound.inclusive(lower), Bound.inclusive(upper))
filterIntervals.exists(bounds => bounds.intersects(buffered))
} catch {
case NonFatal(ex) =>
logger.warn(s"Failed to evaluate filter for path '${path.getName}'", ex)
true
}
}

private def parseDtg(name: String): ZonedDateTime = {
Option(name)
.map(pattern.matcher)
.filter(_.matches)
.filter(_.groupCount > 0)
.map(_.group(1))
.map(ZonedDateTime.parse(_, format))
.getOrElse {
throw new IllegalArgumentException(s"Failed to parse ${classOf[ZonedDateTime].getName} " +
s"from file name '$name' for pattern '$pattern' and format '$format'")
}
}
}

object DtgPathFilterFactory extends LazyLogging {

val Name = "dtg"

object Config {
val Attribute = "attribute"
val Pattern = "pattern"
val Format = "format"
val Buffer = "buffer"
}

class DtgPathFilterFactoryFactory extends PathFilterFactoryFactory {
override def load(config: NamedOptions): Option[PathFilterFactory] = {
if (config.name != Name) { None } else {
val attribute = config.options.getOrElse(Config.Attribute, null)
require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'")
val patternConfig = config.options.getOrElse(Config.Pattern, null)
require(patternConfig != null, s"$Name path filter requires a dtg pattern config '${Config.Pattern}'")
val formatConfig = config.options.getOrElse(Config.Format, null)
require(formatConfig != null, s"$Name path filter requires a dtg format config '${Config.Format}'")
val bufferConfig = config.options.getOrElse(Config.Buffer, null)
require(bufferConfig != null, s"$Name path filter requires a buffer duration config '${Config.Buffer}'")

val pattern = Pattern.compile(patternConfig)
val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC)
val buffer = Duration.apply(bufferConfig)
Some(new DtgPathFilterFactory(attribute, pattern, format, buffer))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.pathfilters

import org.apache.hadoop.fs.Path
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory}
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
class DtgPathFilterFactoryTest extends Specification {

import org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory.Config._

"DtgPathFilterFactory" should {
"parse, format, buffer, and filter a dtg from a file path" in {
val attribute = "dtg"
val pattern = "^data-(.*)\\..*$"
val format = "yyyyMMddHHmm"
val buffer = "6 hours"
val config = NamedOptions(DtgPathFilterFactory.Name,
Map(Attribute -> attribute, Pattern -> pattern, Format -> format, Buffer -> buffer))

val pathFilterFactory = PathFilterFactoryFactory.load(config)
pathFilterFactory must beSome { factory: PathFilterFactory =>
factory must haveClass[DtgPathFilterFactory]
}

val filterText = s"$attribute DURING 2024-12-10T00:00:00Z/2024-12-11T00:00:00Z " +
s"OR $attribute = 2024-12-10T00:00:00Z OR $attribute = 2024-12-11T00:00:00Z"
val filter = ECQL.toFilter(filterText)
val pathFilter = pathFilterFactory.get.apply(filter)

val path1 = new Path("/geomesa/fs/data-202412080000.csv")
val path2 = new Path("/geomesa/fs/data-202412092200.csv")
val path3 = new Path("/geomesa/fs/data-202412110000.csv")
val path4 = new Path("/geomesa/fs/data-202412110600.csv")
val path5 = new Path("/geomesa/fs/data-202412111000.csv")

pathFilter.accept(path1) must beFalse
pathFilter.accept(path2) must beTrue
pathFilter.accept(path3) must beTrue
pathFilter.accept(path4) must beTrue
pathFilter.accept(path5) must beFalse
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter

import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert.EvaluationContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class ConverterMetadata(
context: FileSystemContext,
val sft: SimpleFeatureType,
val scheme: PartitionScheme,
val leafStorage: Boolean
val leafStorage: Boolean,
val pathFilterFactory: Option[PathFilterFactory]
) extends StorageMetadata {

private val dirty = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,20 @@ class ConverterMetadataFactory extends StorageMetadataFactory with LazyLogging {
deprecated.getOrElse(true)
}

Some(new ConverterMetadata(context, sft, scheme, leafStorage))
val pathFilterFactoryOpts =
context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map {
case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v
}

val pathFilterFactory = Option(context.conf.get(PathFilterName)).flatMap { name =>
val factory = PathFilterFactoryFactory.load(NamedOptions(name, pathFilterFactoryOpts.toMap))
if (factory.isEmpty) {
logger.warn(s"Failed to load ${classOf[PathFilterFactory].getName} for config '$name'")
}
factory
}

Some(new ConverterMetadata(context, sft, scheme, leafStorage, pathFilterFactory))
} catch {
case e: IllegalArgumentException => logger.warn(s"Couldn't create converter storage metadata: $e", e); None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ object ConverterStorageFactory {
val LeafStorageParam = "fs.options.leaf-storage"
val PartitionSchemeParam = "fs.partition-scheme.name"
val PartitionOptsPrefix = "fs.partition-scheme.opts."
val PathFilterName = "fs.path-filter.name"
val PathFilterOptsPrefix = "fs.path-filter.opts."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
0,Harry,17,2024-12-11T01:00:00.000Z,0.0,0.0
1,Hermione,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
2,Ronald,17,2024-12-11T07:00:00.000Z,0.0,0.0
3,Draco,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
4,Neville,17,2024-12-11T13:00:00.000Z,0.0,0.0
5,Rubeus,43,2024-12-11T08:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
6,Severus,52,2024-12-11T19:00:00.000Z,0.0,0.0
7,Alfred,78,2024-12-11T23:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0
9,Minerva,17,2024-12-12T00:30:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0
11,Dudley,17,2024-12-12T05:00:00.000Z,0.0,0.0
Loading

0 comments on commit 3c77df2

Please sign in to comment.