From ad673927bc4400edd4955328913944c7a1c2ce30 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 2 Dec 2020 12:18:50 +0800 Subject: [PATCH] Features are not being ingested due to max age overflow (#1209) Signed-off-by: Oleksii Moskalenko --- .../src/main/scala/feast/ingestion/BatchPipeline.scala | 2 +- .../src/main/scala/feast/ingestion/IngestionJobConfig.scala | 2 +- .../src/main/scala/feast/ingestion/StreamingPipeline.scala | 2 +- .../scala/feast/ingestion/stores/redis/SparkRedisConfig.scala | 4 ++-- .../src/test/scala/feast/ingestion/BatchPipelineIT.scala | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala index 1348914b86..f35458e958 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala @@ -72,7 +72,7 @@ object BatchPipeline extends BasePipeline { .option("namespace", featureTable.name) .option("project_name", featureTable.project) .option("timestamp_column", config.source.eventTimestampColumn) - .option("max_age", config.featureTable.maxAge.getOrElse(0)) + .option("max_age", config.featureTable.maxAge.getOrElse(0L)) .save() config.deadLetterPath match { diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index ac7ef15d82..109c98f09a 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -95,7 +95,7 @@ case class FeatureTable( project: String, entities: Seq[Field], features: Seq[Field], - maxAge: Option[Int] = None + maxAge: Option[Long] = None ) case class IngestionJobConfig( diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index 99d5e66c88..49eb5b169f 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -87,7 +87,7 @@ object StreamingPipeline extends BasePipeline with Serializable { .option("namespace", featureTable.name) .option("project_name", featureTable.project) .option("timestamp_column", config.source.eventTimestampColumn) - .option("max_age", config.featureTable.maxAge.getOrElse(0)) + .option("max_age", config.featureTable.maxAge.getOrElse(0L)) .save() config.deadLetterPath match { diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala index cac12a6c27..8892b48643 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala @@ -24,7 +24,7 @@ case class SparkRedisConfig( iteratorGroupingSize: Int = 1000, timestampPrefix: String = "_ts", repartitionByEntity: Boolean = true, - maxAge: Int = 0, + maxAge: Long = 0, expiryPrefix: String = "_ex" ) @@ -43,6 +43,6 @@ object SparkRedisConfig { entityColumns = parameters.getOrElse(ENTITY_COLUMNS, "").split(","), timestampColumn = parameters.getOrElse(TS_COLUMN, "event_timestamp"), repartitionByEntity = parameters.getOrElse(ENTITY_REPARTITION, "true") == "true", - maxAge = parameters.get(MAX_AGE).map(_.toInt).getOrElse(0) + maxAge = parameters.get(MAX_AGE).map(_.toLong).getOrElse(0) ) } diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala index 70f8a1f718..7f5e4b2122 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala @@ -131,7 +131,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { val gen = rowGenerator(startDate, endDate) val rows = generateDistinctRows(gen, 1000, groupByEntity) val tempPath = storeAsParquet(sparkSession, rows) - val maxAge = 86400 * 2 + val maxAge = 86400L * 30 val configWithMaxAge = config.copy( source = FileSource(tempPath, Map.empty, "eventTimestamp"), featureTable = config.featureTable.copy(maxAge = Some(maxAge)), @@ -162,7 +162,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { }) - val increasedMaxAge = 86400 * 3 + val increasedMaxAge = 86400L * 60 val configWithSecondFeatureTable = config.copy( source = FileSource(tempPath, Map.empty, "eventTimestamp"), featureTable = config.featureTable.copy(