Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3426 FSDS - Support path filtering for converter queries #3245

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/user/filesystem/index_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,25 @@ Observers can be specified through the user data key ``geomesa.fs.observers``:
sft.setObservers(factories)
// or set directly in the user data as a comma-delimited string
sft.getUserData.put("geomesa.fs.observers", factories.mkString(","))

Configuring Path Filters
------------------------

.. note::

Path filtering is supported for ``converter`` encoding only.

The FSDS can filter paths within a partition for more granular control of queries. Path filtering is configured
through the user data key ``geomesa.fs.path-filter.name``.

Currently, the only implementation is the ``dtg`` path filter, whose purpose is to parse a datetime from the given
path and compare it to the query filter to include or exclude the file from the query. The following options are
required for the ``dtg`` path filter, configured through the key ``geomesa.fs.path-filter.opts``:

* ``attribute`` - The ``Date`` attribute in the query to compare against.
* ``pattern`` - The regular expression, with a single capturing group, to extract a datetime string from the path.
* ``format`` - The datetime formatting pattern to parse a date from the regex capture.
* ``buffer`` - The duration to buffer the bounds of the parsed datetime by within the current partition. To buffer time
across partitions, see the ``receipt-time`` partition scheme.

Custom path filters can be loaded via SPI.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Bounds {
object Bound {
private val unboundedBound = Bound[Any](None, inclusive = false)
def unbounded[T]: Bound[T] = unboundedBound.asInstanceOf[Bound[T]]
def inclusive[T](value: T): Bound[T] = Bound(Option(value), inclusive = true)
}

private val allValues = Bounds(Bound.unbounded, Bound.unbounded)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering$DtgPathFilteringFactory
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ package org.locationtech.geomesa.fs.storage.converter

import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.features.{ScalaSimpleFeature, TransformSimpleFeature}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle}
import org.locationtech.geomesa.utils.io.PathUtils
Expand All @@ -28,27 +29,34 @@ class ConverterFileSystemReader(
fs: FileSystem,
converter: SimpleFeatureConverter,
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]
transform: Option[(String, SimpleFeatureType)],
pathFiltering: Option[PathFiltering]
) extends FileSystemPathReader with StrictLogging {

import ArchiveStreamFactory.{JAR, TAR, ZIP}

private lazy val pathFilter: Option[PathFilter] = pathFiltering.flatMap(pf => filter.map(pf.apply))

override def read(path: Path): CloseableIterator[SimpleFeature] = {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
if (pathFilter.forall(_.accept(path))) {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
transformed(filtered(iter))
} else {
CloseableIterator.empty
}
transformed(filtered(iter))
}

private def filtered(in: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering

class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, converter: SimpleFeatureConverter)
class ConverterStorage(context: FileSystemContext,
metadata: StorageMetadata,
converter: SimpleFeatureConverter,
pathFiltering: Option[PathFiltering])
extends AbstractFileSystemStorage(context, metadata, "") {

// TODO close converter...
Expand All @@ -36,7 +40,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
override protected def createReader(
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = {
new ConverterFileSystemReader(context.fs, converter, filter, transform)
new ConverterFileSystemReader(context.fs, converter, filter, transform, pathFiltering)
}

override def getFilePaths(partition: String): Seq[StorageFilePath] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import com.typesafe.scalalogging.LazyLogging
import org.locationtech.geomesa.convert.{ConfArgs, ConverterConfigResolver}
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory.{ConverterConfigParam, ConverterNameParam}
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory._
import org.locationtech.geomesa.fs.storage.converter.pathfilter.{PathFiltering, PathFilteringFactory}

import java.util.regex.Pattern

class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging {

import scala.collection.JavaConverters._

override val encoding: String = "converter"

override def apply(context: FileSystemContext, metadata: StorageMetadata): FileSystemStorage = {
Expand All @@ -29,9 +34,22 @@ class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging
}
SimpleFeatureConverter(metadata.sft, converterConfig)
}
new ConverterStorage(context, metadata, converter)
}

val pathFilteringOpts =
context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map {
case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v
}

val pathFiltering = Option(context.conf.get(PathFilterName)).flatMap { name =>
val factory = PathFilteringFactory.load(NamedOptions(name, pathFilteringOpts.toMap))
if (factory.isEmpty) {
throw new IllegalArgumentException(s"Failed to load ${classOf[PathFiltering].getName} for config '$name'")
}
factory
}

new ConverterStorage(context, metadata, converter, pathFiltering)
}
}

object ConverterStorageFactory {
Expand All @@ -43,4 +61,6 @@ object ConverterStorageFactory {
val LeafStorageParam = "fs.options.leaf-storage"
val PartitionSchemeParam = "fs.partition-scheme.name"
val PartitionOptsPrefix = "fs.partition-scheme.opts."
val PathFilterName = "fs.path-filter.name"
val PathFilterOptsPrefix = "fs.path-filter.opts."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.Bounds.Bound
import org.locationtech.geomesa.filter.{Bounds, FilterHelper}
import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}
import java.util.regex.Pattern
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

class DtgPathFiltering(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration)
extends PathFiltering with LazyLogging {

def apply(filter: Filter): PathFilter = {
val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true)
path => try {
val time = parseDtg(path.getName).toInstant
val millis = buffer.toMillis
val lower = ZonedDateTime.ofInstant(time.minusMillis(millis), ZoneOffset.UTC)
val upper = ZonedDateTime.ofInstant(time.plusMillis(millis), ZoneOffset.UTC)
val buffered = Bounds(Bound.inclusive(lower), Bound.inclusive(upper))
val included = filterIntervals.exists(bounds => bounds.intersects(buffered))
logger.whenDebugEnabled {
if (included) {
logger.debug(s"Including path ${path.getName} for filter $filter")
} else {
logger.debug(s"Excluding path ${path.getName} for filter $filter")
}
}
included
} catch {
case NonFatal(ex) =>
logger.warn(s"Failed to evaluate filter for path '${path.getName}'", ex)
true
}
}

private def parseDtg(name: String): ZonedDateTime = {
Option(name)
.map(pattern.matcher)
.filter(_.matches)
.filter(_.groupCount > 0)
.map(_.group(1))
.map(ZonedDateTime.parse(_, format))
.getOrElse {
throw new IllegalArgumentException(s"Failed to parse ${classOf[ZonedDateTime].getName} " +
s"from file name '$name' for pattern '$pattern' and format '$format'")
}
}

override def toString: String = {
s"${this.getClass.getName}(attribute = $attribute, pattern = $pattern, format = $format, buffer = $buffer)"
}
}

object DtgPathFiltering extends LazyLogging {

val Name = "dtg"

object Config {
val Attribute = "attribute"
val Pattern = "pattern"
val Format = "format"
val Buffer = "buffer"
}

class DtgPathFilteringFactory extends PathFilteringFactory {
override def load(config: NamedOptions): Option[PathFiltering] = {
if (config.name != Name) { None } else {
val attribute = config.options.getOrElse(Config.Attribute, null)
require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'")
val patternConfig = config.options.getOrElse(Config.Pattern, null)
require(patternConfig != null, s"$Name path filter requires a dtg pattern config '${Config.Pattern}'")
val formatConfig = config.options.getOrElse(Config.Format, null)
require(formatConfig != null, s"$Name path filter requires a dtg format config '${Config.Format}'")
val bufferConfig = config.options.getOrElse(Config.Buffer, null)
require(bufferConfig != null, s"$Name path filter requires a buffer duration config '${Config.Buffer}'")

val pattern = Pattern.compile(patternConfig)
val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC)
val buffer = Duration.apply(bufferConfig)
val pathFiltering = new DtgPathFiltering(attribute, pattern, format, buffer)
logger.info(s"Loaded PathFiltering: $pathFiltering")
Some(pathFiltering)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter

trait PathFiltering {
def apply(filter: Filter): PathFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.util.ServiceLoader

trait PathFilteringFactory {
def load(config: NamedOptions): Option[PathFiltering]
}

object PathFilteringFactory {

import scala.collection.JavaConverters._

private lazy val factories = ServiceLoader.load(classOf[PathFilteringFactory]).asScala.toSeq

def load(config: NamedOptions): Option[PathFiltering] = factories.toStream.flatMap(_.load(config)).headOption
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
0,Harry,17,2024-12-11T01:00:00.000Z,0.0,0.0
1,Hermione,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
2,Ronald,17,2024-12-11T07:00:00.000Z,0.0,0.0
3,Draco,18,2024-12-11T11:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
4,Neville,17,2024-12-11T13:00:00.000Z,0.0,0.0
5,Rubeus,43,2024-12-11T08:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
6,Severus,52,2024-12-11T19:00:00.000Z,0.0,0.0
7,Alfred,78,2024-12-11T23:00:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0
9,Minerva,57,2024-12-12T00:30:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0
11,Dudley,19,2024-12-12T05:00:00.000Z,0.0,0.0
Loading
Loading