From fb05e712fe2ef38dae3f8fd113f30d16aa8fe000 Mon Sep 17 00:00:00 2001 From: Forrest Feaser Date: Fri, 13 Dec 2024 15:32:41 -0500 Subject: [PATCH] Address MR comments 1 --- .../api/PathFilterFactoryFactory.scala | 40 ------------------- .../fs/storage/api/StorageMetadata.scala | 7 ---- ...sa.fs.storage.api.PathFilterFactoryFactory | 1 - .../common/AbstractFileSystemStorage.scala | 5 +-- .../common/metadata/FileBasedMetadata.scala | 1 - .../common/metadata/JdbcMetadata.scala | 3 +- ....converter.pathfilter.PathFilteringFactory | 1 + .../converter/ConverterFileSystemReader.scala | 38 +++++++++++------- .../storage/converter/ConverterMetadata.scala | 3 +- .../converter/ConverterMetadataFactory.scala | 15 +------ .../storage/converter/ConverterStorage.scala | 8 +++- .../converter/ConverterStorageFactory.scala | 24 +++++++++-- .../pathfilter/DtgPathFiltering.scala} | 16 ++++---- .../converter/pathfilter/PathFiltering.scala} | 4 +- .../pathfilter/PathFilteringFactory.scala | 26 ++++++++++++ .../2024/12/12/data-202412120100.csv | 2 +- .../2024/12/12/data-202412120600.csv | 2 +- .../pathfilter/DtgPathFilteringTest.scala} | 16 ++++---- .../tools/compact/PartitionInputFormat.scala | 5 +-- 19 files changed, 104 insertions(+), 113 deletions(-) delete mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala delete mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory rename geomesa-fs/geomesa-fs-storage/{geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala => geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala} (84%) rename geomesa-fs/geomesa-fs-storage/{geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala => geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala} (87%) create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala rename geomesa-fs/geomesa-fs-storage/{geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala => geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala} (76%) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala deleted file mode 100644 index 98d22c535c85..000000000000 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactoryFactory.scala +++ /dev/null @@ -1,40 +0,0 @@ -/*********************************************************************** - * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Apache License, Version 2.0 - * which accompanies this distribution and is available at - * http://www.opensource.org/licenses/apache2.0.php. - ***********************************************************************/ - -package org.locationtech.geomesa.fs.storage.api - -import java.util.ServiceLoader - -/** - * Factory for loading path filters - */ -trait PathFilterFactoryFactory { - - /** - * Load a path filter factory - * - * @param config factory config options - * @return - */ - def load(config: NamedOptions): Option[PathFilterFactory] -} - -object PathFilterFactoryFactory { - - import scala.collection.JavaConverters._ - - private lazy val factories = ServiceLoader.load(classOf[PathFilterFactoryFactory]).asScala.toSeq - - /** - * Create a path filter factory instance via SPI lookup - * - * @param config factory config options - * @return - */ - def load(config: NamedOptions): Option[PathFilterFactory] = factories.toStream.flatMap(_.load(config)).headOption -} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala index 7f6189207d7e..be80550dae4a 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/StorageMetadata.scala @@ -53,13 +53,6 @@ trait StorageMetadata extends Compactable with Closeable { */ def leafStorage: Boolean - /** - * The path filter factory used to filter paths during queries - * - * @return path filter factory - */ - def pathFilterFactory: Option[PathFilterFactory] - /** * Get a previously set key-value pair * diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory deleted file mode 100644 index 6f908f6272d9..000000000000 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.api.PathFilterFactoryFactory +++ /dev/null @@ -1 +0,0 @@ -org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory$DtgPathFilterFactoryFactory diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index 74f8940c0373..5fcfb47ac9cf 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -127,10 +127,9 @@ abstract class AbstractFileSystemStorage( fp.partitions.mkString(", ")) reader } - val pathFilter = metadata.pathFilterFactory.map(_.apply(fp.filter)) // each partition must be read separately, to ensure modifications are handled correctly - fp.partitions.iterator.flatMap { path => - val files = getFilePaths(path).filter(p => pathFilter.forall(_.accept(p.path))) + fp.partitions.iterator.flatMap { p => + val files = getFilePaths(p) if (files.isEmpty) { Iterator.empty } else { Iterator.single(reader -> files) } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala index fee378b54d5e..e8ea0f492ecc 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala @@ -86,7 +86,6 @@ class FileBasedMetadata( override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme) override val encoding: String = meta.config(Metadata.Encoding) override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean - override val pathFilterFactory: Option[PathFilterFactory] = None private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala index f77ec478e026..37798a068dfb 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadata.scala @@ -12,7 +12,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.commons.dbcp2.{PoolableConnection, PoolingDataSource} import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionMetadata, StorageFile, StorageFileAction} -import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, PathFilterFactory, StorageMetadata} +import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, StorageMetadata} import org.locationtech.geomesa.fs.storage.common.metadata.JdbcMetadata.MetadataTable import org.locationtech.geomesa.utils.io.WithClose import org.locationtech.geomesa.utils.text.StringSerialization @@ -84,7 +84,6 @@ class JdbcMetadata( override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme) override val encoding: String = meta.config(Metadata.Encoding) override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean - override val pathFilterFactory: Option[PathFilterFactory] = None private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory new file mode 100644 index 000000000000..f10142b8bf99 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory @@ -0,0 +1 @@ +org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering$DtgPathFilteringFactory diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala index dd4fc1f10bf4..1edb486d07d7 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala @@ -10,13 +10,14 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.StrictLogging import org.apache.commons.compress.archivers.ArchiveStreamFactory -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert.EvaluationContext import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.features.{ScalaSimpleFeature, TransformSimpleFeature} import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader +import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle} import org.locationtech.geomesa.utils.io.PathUtils @@ -28,27 +29,34 @@ class ConverterFileSystemReader( fs: FileSystem, converter: SimpleFeatureConverter, filter: Option[Filter], - transform: Option[(String, SimpleFeatureType)] + transform: Option[(String, SimpleFeatureType)], + pathFiltering: Option[PathFiltering] ) extends FileSystemPathReader with StrictLogging { import ArchiveStreamFactory.{JAR, TAR, ZIP} + private lazy val pathFilter: Option[PathFilter] = pathFiltering.flatMap(pf => filter.map(pf.apply)) + override def read(path: Path): CloseableIterator[SimpleFeature] = { - logger.debug(s"Opening file $path") - val iter = try { - val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match { - case TAR => new HadoopTarHandle(fs, path) - case ZIP | JAR => new HadoopZipHandle(fs, path) - case _ => new HadoopFileHandle(fs, path) - } - handle.open.flatMap { case (name, is) => - val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path)) - converter.process(is, converter.createEvaluationContext(params)) + if (pathFilter.forall(_.accept(path))) { + logger.debug(s"Opening file $path") + val iter = try { + val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match { + case TAR => new HadoopTarHandle(fs, path) + case ZIP | JAR => new HadoopZipHandle(fs, path) + case _ => new HadoopFileHandle(fs, path) + } + handle.open.flatMap { case (name, is) => + val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path)) + converter.process(is, converter.createEvaluationContext(params)) + } + } catch { + case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty } - } catch { - case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty + transformed(filtered(iter)) + } else { + CloseableIterator.empty } - transformed(filtered(iter)) } private def filtered(in: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala index ffff7a0319fe..c0fd6db0fa62 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala @@ -22,8 +22,7 @@ class ConverterMetadata( context: FileSystemContext, val sft: SimpleFeatureType, val scheme: PartitionScheme, - val leafStorage: Boolean, - val pathFilterFactory: Option[PathFilterFactory] + val leafStorage: Boolean ) extends StorageMetadata { private val dirty = new AtomicBoolean(false) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala index 4dbb8970bb4e..63099f9ab169 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadataFactory.scala @@ -55,20 +55,7 @@ class ConverterMetadataFactory extends StorageMetadataFactory with LazyLogging { deprecated.getOrElse(true) } - val pathFilterFactoryOpts = - context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map { - case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v - } - - val pathFilterFactory = Option(context.conf.get(PathFilterName)).flatMap { name => - val factory = PathFilterFactoryFactory.load(NamedOptions(name, pathFilterFactoryOpts.toMap)) - if (factory.isEmpty) { - logger.warn(s"Failed to load ${classOf[PathFilterFactory].getName} for config '$name'") - } - factory - } - - Some(new ConverterMetadata(context, sft, scheme, leafStorage, pathFilterFactory)) + Some(new ConverterMetadata(context, sft, scheme, leafStorage)) } catch { case e: IllegalArgumentException => logger.warn(s"Couldn't create converter storage metadata: $e", e); None } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 8a4503fb2b80..f507bc019519 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -19,8 +19,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.utils.PathCache +import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering -class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, converter: SimpleFeatureConverter) +class ConverterStorage(context: FileSystemContext, + metadata: StorageMetadata, + converter: SimpleFeatureConverter, + pathFiltering: Option[PathFiltering]) extends AbstractFileSystemStorage(context, metadata, "") { // TODO close converter... @@ -36,7 +40,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co override protected def createReader( filter: Option[Filter], transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = { - new ConverterFileSystemReader(context.fs, converter, filter, transform) + new ConverterFileSystemReader(context.fs, converter, filter, transform, pathFiltering) } override def getFilePaths(partition: String): Seq[StorageFilePath] = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala index 7ee248544772..92ec21e00dd9 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala @@ -12,10 +12,15 @@ import com.typesafe.scalalogging.LazyLogging import org.locationtech.geomesa.convert.{ConfArgs, ConverterConfigResolver} import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.fs.storage.api._ -import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory.{ConverterConfigParam, ConverterNameParam} +import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory._ +import org.locationtech.geomesa.fs.storage.converter.pathfilter.{PathFiltering, PathFilteringFactory} + +import java.util.regex.Pattern class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging { + import scala.collection.JavaConverters._ + override val encoding: String = "converter" override def apply(context: FileSystemContext, metadata: StorageMetadata): FileSystemStorage = { @@ -29,9 +34,22 @@ class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging } SimpleFeatureConverter(metadata.sft, converterConfig) } - new ConverterStorage(context, metadata, converter) - } + val pathFilteringOpts = + context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map { + case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v + } + + val pathFiltering = Option(context.conf.get(PathFilterName)).flatMap { name => + val factory = PathFilteringFactory.load(NamedOptions(name, pathFilteringOpts.toMap)) + if (factory.isEmpty) { + logger.warn(s"Failed to load ${classOf[PathFiltering].getName} for config '$name'") + } + factory + } + + new ConverterStorage(context, metadata, converter, pathFiltering) + } } object ConverterStorageFactory { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala similarity index 84% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala index 997fd68d6121..cd37e00f9539 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala @@ -6,14 +6,14 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.fs.storage.common.pathfilters +package org.locationtech.geomesa.fs.storage.converter.pathfilter import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.fs.PathFilter import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.Bounds.Bound import org.locationtech.geomesa.filter.{Bounds, FilterHelper} -import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory} +import org.locationtech.geomesa.fs.storage.api.NamedOptions import java.time.format.DateTimeFormatter import java.time.{ZoneOffset, ZonedDateTime} @@ -21,8 +21,8 @@ import java.util.regex.Pattern import scala.concurrent.duration.Duration import scala.util.control.NonFatal -class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration) - extends PathFilterFactory with LazyLogging { +class DtgPathFiltering(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration) + extends PathFiltering with LazyLogging { def apply(filter: Filter): PathFilter = { val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true) @@ -54,7 +54,7 @@ class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTime } } -object DtgPathFilterFactory extends LazyLogging { +object DtgPathFiltering extends LazyLogging { val Name = "dtg" @@ -65,8 +65,8 @@ object DtgPathFilterFactory extends LazyLogging { val Buffer = "buffer" } - class DtgPathFilterFactoryFactory extends PathFilterFactoryFactory { - override def load(config: NamedOptions): Option[PathFilterFactory] = { + class DtgPathFilteringFactory extends PathFilteringFactory { + override def load(config: NamedOptions): Option[PathFiltering] = { if (config.name != Name) { None } else { val attribute = config.options.getOrElse(Config.Attribute, null) require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'") @@ -80,7 +80,7 @@ object DtgPathFilterFactory extends LazyLogging { val pattern = Pattern.compile(patternConfig) val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC) val buffer = Duration.apply(bufferConfig) - Some(new DtgPathFilterFactory(attribute, pattern, format, buffer)) + Some(new DtgPathFiltering(attribute, pattern, format, buffer)) } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala similarity index 87% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala index 5b32252e7833..ea0bdfb25985 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/PathFilterFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala @@ -6,11 +6,11 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.fs.storage.api +package org.locationtech.geomesa.fs.storage.converter.pathfilter import org.apache.hadoop.fs.PathFilter import org.geotools.api.filter.Filter -trait PathFilterFactory { +trait PathFiltering { def apply(filter: Filter): PathFilter } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala new file mode 100644 index 000000000000..1073b07d6fed --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala @@ -0,0 +1,26 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.converter.pathfilter + +import org.locationtech.geomesa.fs.storage.api.NamedOptions + +import java.util.ServiceLoader + +trait PathFilteringFactory { + def load(config: NamedOptions): Option[PathFiltering] +} + +object PathFilteringFactory { + + import scala.collection.JavaConverters._ + + private lazy val factories = ServiceLoader.load(classOf[PathFilteringFactory]).asScala.toSeq + + def load(config: NamedOptions): Option[PathFiltering] = factories.toStream.flatMap(_.load(config)).headOption +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv index e2f234c03291..314fc56bb7d3 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv @@ -1,3 +1,3 @@ ID,Name,Age,Dtg,Lon,Dat 8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0 -9,Minerva,17,2024-12-12T00:30:00.000Z,0.0,0.0 \ No newline at end of file +9,Minerva,57,2024-12-12T00:30:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv index b92224375b13..4d1e2bd46ba7 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv @@ -1,3 +1,3 @@ ID,Name,Age,Dtg,Lon,Dat 10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0 -11,Dudley,17,2024-12-12T05:00:00.000Z,0.0,0.0 \ No newline at end of file +11,Dudley,19,2024-12-12T05:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala similarity index 76% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala index 92d3fee31221..77db35056d9e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/pathfilters/DtgPathFilterFactoryTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala @@ -6,19 +6,19 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.fs.storage.common.pathfilters +package org.locationtech.geomesa.fs.storage.converter.pathfilter import org.apache.hadoop.fs.Path import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith -import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory} +import org.locationtech.geomesa.fs.storage.api.NamedOptions import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner @RunWith(classOf[JUnitRunner]) -class DtgPathFilterFactoryTest extends Specification { +class DtgPathFilteringTest extends Specification { - import org.locationtech.geomesa.fs.storage.common.pathfilters.DtgPathFilterFactory.Config._ + import org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering.Config._ "DtgPathFilterFactory" should { "parse, format, buffer, and filter a dtg from a file path" in { @@ -26,12 +26,12 @@ class DtgPathFilterFactoryTest extends Specification { val pattern = "^data-(.*)\\..*$" val format = "yyyyMMddHHmm" val buffer = "6 hours" - val config = NamedOptions(DtgPathFilterFactory.Name, + val config = NamedOptions(DtgPathFiltering.Name, Map(Attribute -> attribute, Pattern -> pattern, Format -> format, Buffer -> buffer)) - val pathFilterFactory = PathFilterFactoryFactory.load(config) - pathFilterFactory must beSome { factory: PathFilterFactory => - factory must haveClass[DtgPathFilterFactory] + val pathFilterFactory = PathFilteringFactory.load(config) + pathFilterFactory must beSome { factory: PathFiltering => + factory must haveClass[DtgPathFiltering] } val filterText = s"$attribute DURING 2024-12-10T00:00:00Z/2024-12-11T00:00:00Z " + diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala index f20617dd4722..0ea346b7943c 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala @@ -19,6 +19,7 @@ import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.SizeableFileSystemStorage import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.common.utils.PathCache +import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering import org.locationtech.geomesa.fs.tools.compact.PartitionInputFormat.{PartitionInputSplit, PartitionRecordReader} import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} @@ -133,8 +134,7 @@ object PartitionInputFormat { } // use a cached metadata impl instead of reloading val data = PartitionMetadata(partition, files, None, 0L) - val cached = new CachedMetadata(metadata.sft, metadata.encoding, metadata.scheme, metadata.leafStorage, - metadata.pathFilterFactory, data) + val cached = new CachedMetadata(metadata.sft, metadata.encoding, metadata.scheme, metadata.leafStorage, data) storage = FileSystemStorageFactory(fsc, cached) reader = storage.getReader(new Query("", Filter.INCLUDE), Option(partition)) metadata.close() @@ -164,7 +164,6 @@ object PartitionInputFormat { val encoding: String, val scheme: PartitionScheme, val leafStorage: Boolean, - val pathFilterFactory: Option[PathFilterFactory], partition: PartitionMetadata ) extends StorageMetadata { override def getPartition(name: String): Option[PartitionMetadata] =