diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala index 5bad029b5b..55d5c901ea 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala @@ -16,7 +16,7 @@ */ package feast.ingestion.sources.file -import java.sql.Timestamp +import java.sql.{Timestamp, Date} import feast.ingestion.FileSource import org.apache.spark.sql.functions.col @@ -30,9 +30,17 @@ object FileReader { start: DateTime, end: DateTime ): DataFrame = { - sqlContext.read + val reader = sqlContext.read .parquet(source.path) .filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis)) .filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis)) + + source.datePartitionColumn match { + case Some(partitionColumn) if partitionColumn.nonEmpty => + reader + .filter(col(partitionColumn) >= new Date(start.getMillis)) + .filter(col(partitionColumn) <= new Date(end.getMillis)) + case _ => reader + } } } diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala index 0e9a96eb42..e0f57f6f0d 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala @@ -110,7 +110,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { val rows = generateDistinctRows(gen, 10000, groupByEntity) val tempPath = storeAsParquet(sparkSession, rows) val configWithOfflineSource = config.copy( - source = FileSource(tempPath, Map.empty, "eventTimestamp") + source = FileSource(tempPath, Map.empty, "eventTimestamp", datePartitionColumn = Some("date")) ) BatchPipeline.createPipeline(sparkSession, configWithOfflineSource)