Skip to content

Commit

Permalink
Features are not being ingested due to max age overflow (#1209)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <[email protected]>
  • Loading branch information
pyalex authored Dec 2, 2020
1 parent 3826bd9 commit ad67392
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object BatchPipeline extends BasePipeline {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.option("max_age", config.featureTable.maxAge.getOrElse(0L))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ case class FeatureTable(
project: String,
entities: Seq[Field],
features: Seq[Field],
maxAge: Option[Int] = None
maxAge: Option[Long] = None
)

case class IngestionJobConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object StreamingPipeline extends BasePipeline with Serializable {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.option("max_age", config.featureTable.maxAge.getOrElse(0L))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ case class SparkRedisConfig(
iteratorGroupingSize: Int = 1000,
timestampPrefix: String = "_ts",
repartitionByEntity: Boolean = true,
maxAge: Int = 0,
maxAge: Long = 0,
expiryPrefix: String = "_ex"
)

Expand All @@ -43,6 +43,6 @@ object SparkRedisConfig {
entityColumns = parameters.getOrElse(ENTITY_COLUMNS, "").split(","),
timestampColumn = parameters.getOrElse(TS_COLUMN, "event_timestamp"),
repartitionByEntity = parameters.getOrElse(ENTITY_REPARTITION, "true") == "true",
maxAge = parameters.get(MAX_AGE).map(_.toInt).getOrElse(0)
maxAge = parameters.get(MAX_AGE).map(_.toLong).getOrElse(0)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
val gen = rowGenerator(startDate, endDate)
val rows = generateDistinctRows(gen, 1000, groupByEntity)
val tempPath = storeAsParquet(sparkSession, rows)
val maxAge = 86400 * 2
val maxAge = 86400L * 30
val configWithMaxAge = config.copy(
source = FileSource(tempPath, Map.empty, "eventTimestamp"),
featureTable = config.featureTable.copy(maxAge = Some(maxAge)),
Expand Down Expand Up @@ -162,7 +162,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {

})

val increasedMaxAge = 86400 * 3
val increasedMaxAge = 86400L * 60
val configWithSecondFeatureTable = config.copy(
source = FileSource(tempPath, Map.empty, "eventTimestamp"),
featureTable = config.featureTable.copy(
Expand Down

0 comments on commit ad67392

Please sign in to comment.