From 3c77df25b55b298ec040cef49f65f9dd034c0d78 Mon Sep 17 00:00:00 2001 From: Forrest Feaser Date: Thu, 12 Dec 2024 16:25:42 -0500 Subject: [PATCH] FSDS - Support path filtering for queries --- .../locationtech/geomesa/filter/Bounds.scala | 1 + .../fs/storage/api/PathFilterFactory.scala | 16 +++ .../api/PathFilterFactoryFactory.scala | 40 +++++++ .../fs/storage/api/StorageMetadata.scala | 7 ++ ...sa.fs.storage.api.PathFilterFactoryFactory | 1 + .../common/AbstractFileSystemStorage.scala | 5 +- .../common/metadata/FileBasedMetadata.scala | 1 + .../common/metadata/JdbcMetadata.scala | 3 +- .../pathfilters/DtgPathFilterFactory.scala | 87 +++++++++++++++ .../DtgPathFilterFactoryTest.scala | 55 ++++++++++ .../converter/ConverterFileSystemReader.scala | 2 +- .../storage/converter/ConverterMetadata.scala | 3 +- .../converter/ConverterMetadataFactory.scala | 15 ++- .../converter/ConverterStorageFactory.scala | 2 + .../2023/01/18/example.tgz | Bin .../2024/12/11/data-202412110600.csv | 3 + .../2024/12/11/data-202412111200.csv | 3 + .../2024/12/11/data-202412111800.csv | 3 + .../2024/12/11/data-202412112330.csv | 3 + .../2024/12/12/data-202412120100.csv | 3 + .../2024/12/12/data-202412120600.csv | 3 + .../ConverterFileSystemStorageTest.scala | 103 +++++++++++++----- .../tools/compact/PartitionInputFormat.scala | 6 +- 23 files changed, 327 insertions(+), 38 deletions(-) create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala rename geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/{example-convert-test => example-convert-test-1}/2023/01/18/example.tgz (100%) create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv diff --git a/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala b/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala index 0f9829466415..0ef139cbb716 100644 --- a/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala +++ b/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala @@ -119,6 +119,7 @@ object Bounds { object Bound { private val unboundedBound = Bound[Any](None, inclusive = false) def unbounded[T]: Bound[T] = unboundedBound.asInstanceOf[Bound[T]] + def inclusive[T](value: T): Bound[T] = Bound(Option(value), inclusive = true) } private val allValues = Bounds(Bound.unbounded, Bound.unbounded) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala new file mode 100644 index 000000000000..5b32252e7833 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala @@ -0,0 +1,16 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.api + +import org.apache.hadoop.fs.PathFilter +import org.geotools.api.filter.Filter + +trait PathFilterFactory { + def apply(filter: Filter): PathFilter +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala new file mode 100644 index 000000000000..98d22c535c85 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala @@ -0,0 +1,40 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.api + +import java.util.ServiceLoader + +/** + * Factory for loading path filters + */ +trait PathFilterFactoryFactory { + + /** + * Load a path filter factory + * + * @param config factory config options + * @return + */ + def load(config: NamedOptions): Option[PathFilterFactory] +} + +object PathFilterFactoryFactory { + + import scala.collection.JavaConverters._ + + private lazy val factories = ServiceLoader.load(classOf[PathFilterFactoryFactory]).asScala.toSeq + + /** + * Create a path filter factory instance via SPI lookup + * + * @param config factory config options + * @return + */ + def load(config: NamedOptions): Option[PathFilterFactory] = factories.toStream.flatMap(_.load(config)).headOption +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala index be80550dae4a..7f6189207d7e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala @@ -53,6 +53,13 @@ trait StorageMetadata extends Compactable with Closeable { */ def leafStorage: Boolean + /** + * The path filter factory used to filter paths during queries + * + * @return path filter factory + */ + def pathFilterFactory: Option[PathFilterFactory] + /** * Get a previously set key-value pair * diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory new file mode 100644 index 000000000000..6f908f6272d9 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory @@ -0,0 +1 @@ +org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory$DtgPathFilterFactoryFactory diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index 5fcfb47ac9cf..74f8940c0373 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -127,9 +127,10 @@ abstract class AbstractFileSystemStorage( fp.partitions.mkString(", ")) reader } + val pathFilter = metadata.pathFilterFactory.map(_.apply(fp.filter)) // each partition must be read separately, to ensure modifications are handled correctly - fp.partitions.iterator.flatMap { p => - val files = getFilePaths(p) + fp.partitions.iterator.flatMap { path => + val files = getFilePaths(path).filter(p => pathFilter.forall(_.accept(p.path))) if (files.isEmpty) { Iterator.empty } else { Iterator.single(reader -> files) } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala index e8ea0f492ecc..fee378b54d5e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala @@ -86,6 +86,7 @@ class FileBasedMetadata( override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme) override val encoding: String = meta.config(Metadata.Encoding) override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean + override val pathFilterFactory: Option[PathFilterFactory] = None private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala index 37798a068dfb..f77ec478e026 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala @@ -12,7 +12,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.commons.dbcp2.{PoolableConnection, PoolingDataSource} import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionMetadata, StorageFile, StorageFileAction} -import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, StorageMetadata} +import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, PathFilterFactory, StorageMetadata} import org.locationtech.geomesa.fs.storage.common.metadata.JdbcMetadata.MetadataTable import org.locationtech.geomesa.utils.io.WithClose import org.locationtech.geomesa.utils.text.StringSerialization @@ -84,6 +84,7 @@ class JdbcMetadata( override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme) override val encoding: String = meta.config(Metadata.Encoding) override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean + override val pathFilterFactory: Option[PathFilterFactory] = None private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala new file mode 100644 index 000000000000..997fd68d6121 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala @@ -0,0 +1,87 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.pathfilters + +import com.typesafe.scalalogging.LazyLogging +import org.apache.hadoop.fs.PathFilter +import org.geotools.api.filter.Filter +import org.locationtech.geomesa.filter.Bounds.Bound +import org.locationtech.geomesa.filter.{Bounds, FilterHelper} +import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory} + +import java.time.format.DateTimeFormatter +import java.time.{ZoneOffset, ZonedDateTime} +import java.util.regex.Pattern +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration) + extends PathFilterFactory with LazyLogging { + + def apply(filter: Filter): PathFilter = { + val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true) + path => try { + val time = parseDtg(path.getName).toInstant + val millis = buffer.toMillis + val lower = ZonedDateTime.ofInstant(time.minusMillis(millis), ZoneOffset.UTC) + val upper = ZonedDateTime.ofInstant(time.plusMillis(millis), ZoneOffset.UTC) + val buffered = Bounds(Bound.inclusive(lower), Bound.inclusive(upper)) + filterIntervals.exists(bounds => bounds.intersects(buffered)) + } catch { + case NonFatal(ex) => + logger.warn(s"Failed to evaluate filter for path '${path.getName}'", ex) + true + } + } + + private def parseDtg(name: String): ZonedDateTime = { + Option(name) + .map(pattern.matcher) + .filter(_.matches) + .filter(_.groupCount > 0) + .map(_.group(1)) + .map(ZonedDateTime.parse(_, format)) + .getOrElse { + throw new IllegalArgumentException(s"Failed to parse ${classOf[ZonedDateTime].getName} " + + s"from file name '$name' for pattern '$pattern' and format '$format'") + } + } +} + +object DtgPathFilterFactory extends LazyLogging { + + val Name = "dtg" + + object Config { + val Attribute = "attribute" + val Pattern = "pattern" + val Format = "format" + val Buffer = "buffer" + } + + class DtgPathFilterFactoryFactory extends PathFilterFactoryFactory { + override def load(config: NamedOptions): Option[PathFilterFactory] = { + if (config.name != Name) { None } else { + val attribute = config.options.getOrElse(Config.Attribute, null) + require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'") + val patternConfig = config.options.getOrElse(Config.Pattern, null) + require(patternConfig != null, s"$Name path filter requires a dtg pattern config '${Config.Pattern}'") + val formatConfig = config.options.getOrElse(Config.Format, null) + require(formatConfig != null, s"$Name path filter requires a dtg format config '${Config.Format}'") + val bufferConfig = config.options.getOrElse(Config.Buffer, null) + require(bufferConfig != null, s"$Name path filter requires a buffer duration config '${Config.Buffer}'") + + val pattern = Pattern.compile(patternConfig) + val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC) + val buffer = Duration.apply(bufferConfig) + Some(new DtgPathFilterFactory(attribute, pattern, format, buffer)) + } + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala new file mode 100644 index 000000000000..92d3fee31221 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala @@ -0,0 +1,55 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.pathfilters + +import org.apache.hadoop.fs.Path +import org.geotools.filter.text.ecql.ECQL +import org.junit.runner.RunWith +import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory} +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class DtgPathFilterFactoryTest extends Specification { + + import org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory.Config._ + + "DtgPathFilterFactory" should { + "parse, format, buffer, and filter a dtg from a file path" in { + val attribute = "dtg" + val pattern = "^data-(.*)\\..*$" + val format = "yyyyMMddHHmm" + val buffer = "6 hours" + val config = NamedOptions(DtgPathFilterFactory.Name, + Map(Attribute -> attribute, Pattern -> pattern, Format -> format, Buffer -> buffer)) + + val pathFilterFactory = PathFilterFactoryFactory.load(config) + pathFilterFactory must beSome { factory: PathFilterFactory => + factory must haveClass[DtgPathFilterFactory] + } + + val filterText = s"$attribute DURING 2024-12-10T00:00:00Z/2024-12-11T00:00:00Z " + + s"OR $attribute = 2024-12-10T00:00:00Z OR $attribute = 2024-12-11T00:00:00Z" + val filter = ECQL.toFilter(filterText) + val pathFilter = pathFilterFactory.get.apply(filter) + + val path1 = new Path("/geomesa/fs/data-202412080000.csv") + val path2 = new Path("/geomesa/fs/data-202412092200.csv") + val path3 = new Path("/geomesa/fs/data-202412110000.csv") + val path4 = new Path("/geomesa/fs/data-202412110600.csv") + val path5 = new Path("/geomesa/fs/data-202412111000.csv") + + pathFilter.accept(path1) must beFalse + pathFilter.accept(path2) must beTrue + pathFilter.accept(path3) must beTrue + pathFilter.accept(path4) must beTrue + pathFilter.accept(path5) must beFalse + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala index 6db37de760bc..dd4fc1f10bf4 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.StrictLogging import org.apache.commons.compress.archivers.ArchiveStreamFactory -import org.apache.hadoop.fs.{FileContext, FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert.EvaluationContext diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala index c0fd6db0fa62..ffff7a0319fe 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala @@ -22,7 +22,8 @@ class ConverterMetadata( context: FileSystemContext, val sft: SimpleFeatureType, val scheme: PartitionScheme, - val leafStorage: Boolean + val leafStorage: Boolean, + val pathFilterFactory: Option[PathFilterFactory] ) extends StorageMetadata { private val dirty = new AtomicBoolean(false) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala index 63099f9ab169..4dbb8970bb4e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala @@ -55,7 +55,20 @@ class ConverterMetadataFactory extends StorageMetadataFactory with LazyLogging { deprecated.getOrElse(true) } - Some(new ConverterMetadata(context, sft, scheme, leafStorage)) + val pathFilterFactoryOpts = + context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map { + case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v + } + + val pathFilterFactory = Option(context.conf.get(PathFilterName)).flatMap { name => + val factory = PathFilterFactoryFactory.load(NamedOptions(name, pathFilterFactoryOpts.toMap)) + if (factory.isEmpty) { + logger.warn(s"Failed to load ${classOf[PathFilterFactory].getName} for config '$name'") + } + factory + } + + Some(new ConverterMetadata(context, sft, scheme, leafStorage, pathFilterFactory)) } catch { case e: IllegalArgumentException => logger.warn(s"Couldn't create converter storage metadata: $e", e); None } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala index 8b2e8433de94..7ee248544772 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala @@ -43,4 +43,6 @@ object ConverterStorageFactory { val LeafStorageParam = "fs.options.leaf-storage" val PartitionSchemeParam = "fs.partition-scheme.name" val PartitionOptsPrefix = "fs.partition-scheme.opts." + val PathFilterName = "fs.path-filter.name" + val PathFilterOptsPrefix = "fs.path-filter.opts." } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test/2023/01/18/example.tgz b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-1/2023/01/18/example.tgz similarity index 100% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test/2023/01/18/example.tgz rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-1/2023/01/18/example.tgz diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv new file mode 100644 index 000000000000..f3411ac2e53c --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +0,Harry,17,2024-12-11T01:00:00.000Z,0.0,0.0 +1,Hermione,18,2024-12-11T11:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv new file mode 100644 index 000000000000..355ab2b8c9d7 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +2,Ronald,17,2024-12-11T07:00:00.000Z,0.0,0.0 +3,Draco,18,2024-12-11T11:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv new file mode 100644 index 000000000000..6dff3517aac9 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +4,Neville,17,2024-12-11T13:00:00.000Z,0.0,0.0 +5,Rubeus,43,2024-12-11T08:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv new file mode 100644 index 000000000000..f4109e763168 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +6,Severus,52,2024-12-11T19:00:00.000Z,0.0,0.0 +7,Alfred,78,2024-12-11T23:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv new file mode 100644 index 000000000000..e2f234c03291 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0 +9,Minerva,17,2024-12-12T00:30:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv new file mode 100644 index 000000000000..b92224375b13 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0 +11,Dudley,17,2024-12-12T05:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala index 5d6c2eb6683b..a982c8f1289f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.geotools.api.data.Query import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith @@ -23,40 +23,43 @@ import org.specs2.runner.JUnitRunner @RunWith(classOf[JUnitRunner]) class ConverterFileSystemStorageTest extends Specification with LazyLogging { + private val sftConfig = + """geomesa.sfts.example = { + | attributes = [ + | { name = "name", type = "String" } + | { name = "age", type = "Int" } + | { name = "dtg", type = "Date", default = true } + | { name = "geom", type = "Point", srid = 4326, default = true } + | ] + |}""".stripMargin + private val converterConfig = + """geomesa.converters.example = { + | type = "delimited-text" + | format = "CSV" + | options { + | skip-lines = 1 + | } + | id-field = "toString($fid)", + | fields = [ + | { name = "fid", transform = "$1::int" } + | { name = "name", transform = "$2::string" } + | { name = "age", transform = "$3::int" } + | { name = "dtg", transform = "datetime($4)" } + | { name = "lon", transform = "$5::double" } + | { name = "lat", transform = "$6::double" } + | { name = "geom", transform = "point($lon, $lat)" } + | ] + |}""".stripMargin + "ConverterFileSystemStorage" should { "read features in compressed tar.gz files" in { - val dir = Option(getClass.getClassLoader.getResource("example-convert-test")).map(_.toURI).orNull + val dir = Option(getClass.getClassLoader.getResource("example-convert-test-1")).map(_.toURI).orNull dir must not(beNull) val conf = new Configuration() - conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test") - conf.set(ConverterStorageFactory.SftConfigParam, - """geomesa.sfts.example = { - | attributes = [ - | { name = "name", type = "String" } - | { name = "age", type = "Int" } - | { name = "dtg", type = "Date", default = true } - | { name = "geom", type = "Point", srid = 4326, default = true } - | ] - |}""".stripMargin) - conf.set(ConverterStorageFactory.ConverterConfigParam, - """geomesa.converters.example = { - | type = "delimited-text" - | format = "CSV" - | options { - | skip-lines = 1 - | } - | id-field = "toString($fid)", - | fields = [ - | { name = "fid", transform = "$1::int" } - | { name = "name", transform = "$2::string" } - | { name = "age", transform = "$3::int" } - | { name = "dtg", transform = "datetime($4)" } - | { name = "lon", transform = "$5::double" } - | { name = "lat", transform = "$6::double" } - | { name = "geom", transform = "point($lon, $lat)" } - | ] - |}""".stripMargin) + conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test-1") + conf.set(ConverterStorageFactory.SftConfigParam, sftConfig) + conf.set(ConverterStorageFactory.ConverterConfigParam, converterConfig) conf.set(ConverterStorageFactory.PartitionSchemeParam, "daily") conf.set(ConverterStorageFactory.LeafStorageParam, "false") @@ -74,5 +77,45 @@ class ConverterFileSystemStorageTest extends Specification with LazyLogging { features must haveLength(6) } + + "filter file paths by dtg" in { + val dir = Option(getClass.getClassLoader.getResource("example-convert-test-2")).map(_.toURI).orNull + dir must not(beNull) + + val conf = new Configuration() + conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test-2") + conf.set(ConverterStorageFactory.SftConfigParam, sftConfig) + conf.set(ConverterStorageFactory.ConverterConfigParam, converterConfig) + conf.set(ConverterStorageFactory.PartitionSchemeParam, "receipt-time") + conf.set(ConverterStorageFactory.PartitionOptsPrefix + "datetime-scheme", "daily") + conf.set(ConverterStorageFactory.PartitionOptsPrefix + "buffer", "10 minutes") + conf.set(ConverterStorageFactory.LeafStorageParam, "false") + conf.set(ConverterStorageFactory.PathFilterName, "dtg") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "attribute", "dtg") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "pattern", "^data-(.*)\\.csv$") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "format", "yyyyMMddHHmm") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "buffer", "2 hours") + + val context = FileSystemContext(new Path(dir), conf) + val metadata = StorageMetadataFactory.load(context).orNull + metadata must not(beNull) + metadata must haveClass[ConverterMetadata] + val storage = FileSystemStorageFactory(context, metadata) + + val filterText = "dtg DURING 2024-12-11T10:00:00Z/2024-12-11T23:55:00Z " + + "OR dtg = 2024-12-11T10:00:00Z OR dtg = 2024-12-11T23:55:00Z" + val query = new Query(metadata.sft.getTypeName, ECQL.toFilter(filterText)) + val features = { + val iter = SelfClosingIterator(storage.getReader(query)) + // note: need to copy features in iterator as same object is re-used + iter.map(ScalaSimpleFeature.copy).toList + } + + // id 1 is excluded because of the path dtg filter even though dtg is within filter bounds + // id 5 is excluded because dtg is outside filter bounds even though included by path filter + // id 8 is included because within partition scheme buffer and path filter buffer + features must haveLength(5) + features.map(_.getID) must containTheSameElementsAs(Seq("3", "4", "6", "7", "8")) + } } } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala index 8895273e76ca..f20617dd4722 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala @@ -8,7 +8,7 @@ package org.locationtech.geomesa.fs.tools.compact -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.geotools.api.data.Query @@ -133,7 +133,8 @@ object PartitionInputFormat { } // use a cached metadata impl instead of reloading val data = PartitionMetadata(partition, files, None, 0L) - val cached = new CachedMetadata(metadata.sft, metadata.encoding, metadata.scheme, metadata.leafStorage, data) + val cached = new CachedMetadata(metadata.sft, metadata.encoding, metadata.scheme, metadata.leafStorage, + metadata.pathFilterFactory, data) storage = FileSystemStorageFactory(fsc, cached) reader = storage.getReader(new Query("", Filter.INCLUDE), Option(partition)) metadata.close() @@ -163,6 +164,7 @@ object PartitionInputFormat { val encoding: String, val scheme: PartitionScheme, val leafStorage: Boolean, + val pathFilterFactory: Option[PathFilterFactory], partition: PartitionMetadata ) extends StorageMetadata { override def getPartition(name: String): Option[PartitionMetadata] =