Skip to content

Commit

Permalink
Expires Redis Keys based on Feature Table Max Age (#1161)
Browse files Browse the repository at this point in the history
* Expires Redis Keys based on Feature Table Max Age

Signed-off-by: Khor Shu Heng <[email protected]>

* Remove redundant generics, move expiry eventimestamp computation to RedisSinkRelation

Signed-off-by: Khor Shu Heng <[email protected]>

* Persist the key if the expiry date is equal to max timestamp

Signed-off-by: Khor Shu Heng <[email protected]>

* Add more test cases

Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Nov 24, 2020
1 parent 877d8e2 commit 60b568d
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 81 deletions.
4 changes: 2 additions & 2 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@
<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.version}</artifactId>
<version>0.38.3</version>
<version>0.38.6</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-kafka_${scala.version}</artifactId>
<version>0.38.3</version>
<version>0.38.6</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object BatchPipeline extends BasePipeline {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package feast.ingestion

import org.joda.time.DateTime

import org.json4s._
import org.json4s.jackson.JsonMethods.{parse => parseJSON}
import org.json4s.ext.JavaEnumNameSerializer
Expand All @@ -29,7 +28,7 @@ object IngestionJob {
new JavaEnumNameSerializer[feast.proto.types.ValueProto.ValueType.Enum]() +
ShortTypeHints(List(classOf[ProtoFormat], classOf[AvroFormat]))

val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJon") {
val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJob") {
// ToDo: read version from Manifest
head("feast.ingestion.IngestionJob", "0.9.0-SNAPSHOT")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ case class FeatureTable(
name: String,
project: String,
entities: Seq[Field],
features: Seq[Field]
features: Seq[Field],
maxAge: Option[Int] = None
)

case class IngestionJobConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object StreamingPipeline extends BasePipeline with Serializable {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*/
package feast.ingestion.stores.redis

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import redis.clients.jedis.{Pipeline, Response}
import java.nio.charset.StandardCharsets
import java.util

import com.google.common.hash.Hashing

import scala.jdk.CollectionConverters._
import com.google.protobuf.Timestamp
import feast.ingestion.utils.TypeConversion
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import redis.clients.jedis.{Pipeline, Response}

import scala.jdk.CollectionConverters._

/**
* Use Redis hash type as storage layout. Every feature is stored as separate entry in Hash.
Expand All @@ -35,10 +36,10 @@ import feast.ingestion.utils.TypeConversion
* Values are serialized with protobuf (`ValueProto`).
*/
class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Serializable {
def encodeRow(
keyColumns: Array[String],
timestampField: String,
value: Row

private def encodeRow(
value: Row,
maxExpiryTimestamp: java.sql.Timestamp
): Map[Array[Byte], Array[Byte]] = {
val fields = value.schema.fields.map(_.name)
val types = value.schema.fields.map(f => (f.name, f.dataType)).toMap
Expand All @@ -51,49 +52,87 @@ class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Ser
}
.filter { case (k, _) =>
// don't store entities & timestamp
!keyColumns.contains(k) && k != config.timestampColumn
!config.entityColumns.contains(k) && k != config.timestampColumn
}
.map { case (k, v) =>
encodeKey(k) -> encodeValue(v, types(k))
}

val timestamp = Seq(
val timestampHash = Seq(
(
timestampField.getBytes,
timestampHashKey(config.namespace).getBytes,
encodeValue(value.getAs[Timestamp](config.timestampColumn), TimestampType)
)
)

values ++ timestamp
val expiryUnixTimestamp = {
if (config.maxAge > 0)
value.getAs[java.sql.Timestamp](config.timestampColumn).getTime + config.maxAge * 1000
else maxExpiryTimestamp.getTime
}
val expiryTimestamp = new java.sql.Timestamp(expiryUnixTimestamp)
val expiryTimestampHash = Seq(
(
expiryTimestampHashKey(config.namespace).getBytes,
encodeValue(expiryTimestamp, TimestampType)
)
)

values ++ timestampHash ++ expiryTimestampHash
}

def encodeValue(value: Any, `type`: DataType): Array[Byte] = {
private def encodeValue(value: Any, `type`: DataType): Array[Byte] = {
TypeConversion.sqlTypeToProtoValue(value, `type`).toByteArray
}

def encodeKey(key: String): Array[Byte] = {
private def encodeKey(key: String): Array[Byte] = {
val fullFeatureReference = s"${config.namespace}:$key"
Hashing.murmur3_32.hashString(fullFeatureReference, StandardCharsets.UTF_8).asBytes()
}

def save(
private def timestampHashKey(namespace: String): String = {
s"${config.timestampPrefix}:${namespace}"
}

private def expiryTimestampHashKey(namespace: String): String = {
s"${config.expiryPrefix}:${namespace}"
}

private def decodeTimestamp(encodedTimestamp: Array[Byte]): java.sql.Timestamp = {
new java.sql.Timestamp(Timestamp.parseFrom(encodedTimestamp).getSeconds * 1000)
}

override def save(
pipeline: Pipeline,
key: Array[Byte],
value: Map[Array[Byte], Array[Byte]],
ttl: Int
row: Row,
expiryTimestamp: java.sql.Timestamp,
maxExpiryTimestamp: java.sql.Timestamp
): Unit = {
pipeline.hset(key, value.asJava)
if (ttl > 0) {
pipeline.expire(key, ttl)
val value = encodeRow(row, maxExpiryTimestamp).asJava
pipeline.hset(key, value)
if (expiryTimestamp.equals(maxExpiryTimestamp)) {
pipeline.persist(key)
} else {
pipeline.expireAt(key, expiryTimestamp.getTime / 1000)
}
}

def getTimestamp(
override def get(
pipeline: Pipeline,
key: Array[Byte],
timestampField: String
): Response[Array[Byte]] = {
pipeline.hget(key, timestampField.getBytes)
key: Array[Byte]
): Response[util.Map[Array[Byte], Array[Byte]]] = {
pipeline.hgetAll(key)
}

override def storedTimestamp(
value: util.Map[Array[Byte], Array[Byte]]
): Option[java.sql.Timestamp] = {
value.asScala.toMap
.map { case (key, value) =>
(key.map(_.toChar).mkString, value)
}
.get(timestampHashKey(config.namespace))
.map(value => decodeTimestamp(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,58 @@
*/
package feast.ingestion.stores.redis

import java.sql.Timestamp
import java.util

import org.apache.spark.sql.Row
import redis.clients.jedis.{Pipeline, Response}

/**
* Determine how a Spark row should be serialized and stored on Redis.
*/
trait Persistence {
def encodeRow(
keyColumns: Array[String],
timestampField: String,
value: Row
): Map[Array[Byte], Array[Byte]]

/**
* Persist a Spark row to Redis
*
* @param pipeline Redis pipeline
* @param key Redis key in serialized bytes format
* @param row Row representing the value to be persist
* @param expiryTimestamp Expiry timestamp for the row
* @param maxExpiryTimestamp No ttl should be set if the expiry timestamp
* is equal to the maxExpiryTimestamp
*/
def save(
pipeline: Pipeline,
key: Array[Byte],
value: Map[Array[Byte], Array[Byte]],
ttl: Int
row: Row,
expiryTimestamp: Timestamp,
maxExpiryTimestamp: Timestamp
): Unit

def getTimestamp(
/**
* Returns a Redis response, which can be used by `storedTimestamp` and `newExpiryTimestamp` to
* derive the currently stored event timestamp, and the updated expiry timestamp. This method will
* be called prior to persisting the row to Redis, so that `RedisSinkRelation` can decide whether
* the currently stored value should be updated.
*
* @param pipeline Redis pipeline
* @param key Redis key in serialized bytes format
* @return Redis response representing the row value
*/
def get(
pipeline: Pipeline,
key: Array[Byte],
timestampField: String
): Response[Array[Byte]]
key: Array[Byte]
): Response[util.Map[Array[Byte], Array[Byte]]]

/**
* Returns the currently stored event timestamp for the key and the feature table associated with the ingestion job.
*
* @param value Response returned from `get`
* @return Stored event timestamp associated with the key. Returns `None` if
* the key is not present in Redis, or if timestamp information is
* unavailable on the stored value.
*/
def storedTimestamp(value: util.Map[Array[Byte], Array[Byte]]): Option[Timestamp]

}
Loading

0 comments on commit 60b568d

Please sign in to comment.