Skip to content

Commit

Permalink
Address MR comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Forrest Feaser committed Dec 13, 2024
1 parent 3c77df2 commit a530c4c
Show file tree
Hide file tree
Showing 19 changed files with 103 additions and 113 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ trait StorageMetadata extends Compactable with Closeable {
*/
def leafStorage: Boolean

/**
* The path filter factory used to filter paths during queries
*
* @return path filter factory
*/
def pathFilterFactory: Option[PathFilterFactory]

/**
* Get a previously set key-value pair
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,9 @@ abstract class AbstractFileSystemStorage(
fp.partitions.mkString(", "))
reader
}
val pathFilter = metadata.pathFilterFactory.map(_.apply(fp.filter))
// each partition must be read separately, to ensure modifications are handled correctly
fp.partitions.iterator.flatMap { path =>
val files = getFilePaths(path).filter(p => pathFilter.forall(_.accept(p.path)))
fp.partitions.iterator.flatMap { p =>
val files = getFilePaths(p)
if (files.isEmpty) { Iterator.empty } else { Iterator.single(reader -> files) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class FileBasedMetadata(
override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme)
override val encoding: String = meta.config(Metadata.Encoding)
override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean
override val pathFilterFactory: Option[PathFilterFactory] = None

private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.dbcp2.{PoolableConnection, PoolingDataSource}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionMetadata, StorageFile, StorageFileAction}
import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, PathFilterFactory, StorageMetadata}
import org.locationtech.geomesa.fs.storage.api.{Metadata, PartitionScheme, PartitionSchemeFactory, StorageMetadata}
import org.locationtech.geomesa.fs.storage.common.metadata.JdbcMetadata.MetadataTable
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.text.StringSerialization
Expand Down Expand Up @@ -84,7 +84,6 @@ class JdbcMetadata(
override val scheme: PartitionScheme = PartitionSchemeFactory.load(sft, meta.scheme)
override val encoding: String = meta.config(Metadata.Encoding)
override val leafStorage: Boolean = meta.config(Metadata.LeafStorage).toBoolean
override val pathFilterFactory: Option[PathFilterFactory] = None

private val kvs = new ConcurrentHashMap[String, String](meta.config.asJava)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering$DtgPathFilteringFactory
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ package org.locationtech.geomesa.fs.storage.converter

import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.features.{ScalaSimpleFeature, TransformSimpleFeature}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle}
import org.locationtech.geomesa.utils.io.PathUtils
Expand All @@ -28,27 +29,34 @@ class ConverterFileSystemReader(
fs: FileSystem,
converter: SimpleFeatureConverter,
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]
transform: Option[(String, SimpleFeatureType)],
pathFiltering: Option[PathFiltering]
) extends FileSystemPathReader with StrictLogging {

import ArchiveStreamFactory.{JAR, TAR, ZIP}

private lazy val pathFilter: Option[PathFilter] = pathFiltering.flatMap(pf => filter.map(pf.apply))

override def read(path: Path): CloseableIterator[SimpleFeature] = {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
if (pathFilter.forall(_.accept(path))) {
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
case TAR => new HadoopTarHandle(fs, path)
case ZIP | JAR => new HadoopZipHandle(fs, path)
case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
converter.process(is, converter.createEvaluationContext(params))
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
}
} catch {
case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty
transformed(filtered(iter))
} else {
CloseableIterator.empty
}
transformed(filtered(iter))
}

private def filtered(in: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class ConverterMetadata(
context: FileSystemContext,
val sft: SimpleFeatureType,
val scheme: PartitionScheme,
val leafStorage: Boolean,
val pathFilterFactory: Option[PathFilterFactory]
val leafStorage: Boolean
) extends StorageMetadata {

private val dirty = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,7 @@ class ConverterMetadataFactory extends StorageMetadataFactory with LazyLogging {
deprecated.getOrElse(true)
}

val pathFilterFactoryOpts =
context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map {
case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v
}

val pathFilterFactory = Option(context.conf.get(PathFilterName)).flatMap { name =>
val factory = PathFilterFactoryFactory.load(NamedOptions(name, pathFilterFactoryOpts.toMap))
if (factory.isEmpty) {
logger.warn(s"Failed to load ${classOf[PathFilterFactory].getName} for config '$name'")
}
factory
}

Some(new ConverterMetadata(context, sft, scheme, leafStorage, pathFilterFactory))
Some(new ConverterMetadata(context, sft, scheme, leafStorage))
} catch {
case e: IllegalArgumentException => logger.warn(s"Couldn't create converter storage metadata: $e", e); None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering

class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, converter: SimpleFeatureConverter)
class ConverterStorage(context: FileSystemContext,
metadata: StorageMetadata,
converter: SimpleFeatureConverter,
pathFiltering: Option[PathFiltering])
extends AbstractFileSystemStorage(context, metadata, "") {

// TODO close converter...
Expand All @@ -36,7 +40,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
override protected def createReader(
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = {
new ConverterFileSystemReader(context.fs, converter, filter, transform)
new ConverterFileSystemReader(context.fs, converter, filter, transform, pathFiltering)
}

override def getFilePaths(partition: String): Seq[StorageFilePath] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import com.typesafe.scalalogging.LazyLogging
import org.locationtech.geomesa.convert.{ConfArgs, ConverterConfigResolver}
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory.{ConverterConfigParam, ConverterNameParam}
import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory._
import org.locationtech.geomesa.fs.storage.converter.pathfilter.{PathFiltering, PathFilteringFactory}

import java.util.regex.Pattern

class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging {

import scala.collection.JavaConverters._

override val encoding: String = "converter"

override def apply(context: FileSystemContext, metadata: StorageMetadata): FileSystemStorage = {
Expand All @@ -29,9 +34,22 @@ class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging
}
SimpleFeatureConverter(metadata.sft, converterConfig)
}
new ConverterStorage(context, metadata, converter)
}

val pathFilteringOpts =
context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map {
case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v
}

val pathFiltering = Option(context.conf.get(PathFilterName)).flatMap { name =>
val factory = PathFilteringFactory.load(NamedOptions(name, pathFilteringOpts.toMap))
if (factory.isEmpty) {
logger.warn(s"Failed to load ${classOf[PathFiltering].getName} for config '$name'")
}
factory
}

new ConverterStorage(context, metadata, converter, pathFiltering)
}
}

object ConverterStorageFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.pathfilters
package org.locationtech.geomesa.fs.storage.converter.pathfilter

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.Bounds.Bound
import org.locationtech.geomesa.filter.{Bounds, FilterHelper}
import org.locationtech.geomesa.fs.storage.api.{NamedOptions, PathFilterFactory, PathFilterFactoryFactory}
import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.time.format.DateTimeFormatter
import java.time.{ZoneOffset, ZonedDateTime}
import java.util.regex.Pattern
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration)
extends PathFilterFactory with LazyLogging {
class DtgPathFiltering(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration)
extends PathFiltering with LazyLogging {

def apply(filter: Filter): PathFilter = {
val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true)
Expand Down Expand Up @@ -54,7 +54,7 @@ class DtgPathFilterFactory(attribute: String, pattern: Pattern, format: DateTime
}
}

object DtgPathFilterFactory extends LazyLogging {
object DtgPathFiltering extends LazyLogging {

val Name = "dtg"

Expand All @@ -65,8 +65,8 @@ object DtgPathFilterFactory extends LazyLogging {
val Buffer = "buffer"
}

class DtgPathFilterFactoryFactory extends PathFilterFactoryFactory {
override def load(config: NamedOptions): Option[PathFilterFactory] = {
class DtgPathFilteringFactory extends PathFilteringFactory {
override def load(config: NamedOptions): Option[PathFiltering] = {
if (config.name != Name) { None } else {
val attribute = config.options.getOrElse(Config.Attribute, null)
require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'")
Expand All @@ -80,7 +80,7 @@ object DtgPathFilterFactory extends LazyLogging {
val pattern = Pattern.compile(patternConfig)
val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC)
val buffer = Duration.apply(bufferConfig)
Some(new DtgPathFilterFactory(attribute, pattern, format, buffer))
Some(new DtgPathFiltering(attribute, pattern, format, buffer))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.api
package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.apache.hadoop.fs.PathFilter
import org.geotools.api.filter.Filter

trait PathFilterFactory {
trait PathFiltering {
def apply(filter: Filter): PathFilter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.converter.pathfilter

import org.locationtech.geomesa.fs.storage.api.NamedOptions

import java.util.ServiceLoader

trait PathFilteringFactory {
def load(config: NamedOptions): Option[PathFiltering]
}

object PathFilteringFactory {

import scala.collection.JavaConverters._

private lazy val factories = ServiceLoader.load(classOf[PathFilteringFactory]).asScala.toSeq

def load(config: NamedOptions): Option[PathFiltering] = factories.toStream.flatMap(_.load(config)).headOption
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0
9,Minerva,17,2024-12-12T00:30:00.000Z,0.0,0.0
9,Minerva,57,2024-12-12T00:30:00.000Z,0.0,0.0
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
ID,Name,Age,Dtg,Lon,Dat
10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0
11,Dudley,17,2024-12-12T05:00:00.000Z,0.0,0.0
11,Dudley,19,2024-12-12T05:00:00.000Z,0.0,0.0
Loading

0 comments on commit a530c4c

Please sign in to comment.