Skip to content

Commit

Permalink
generalize ingestion pipeline metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <[email protected]>
  • Loading branch information
pyalex committed Dec 23, 2020
1 parent a498dae commit 158a887
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait BasePipeline {
.set("spark.metrics.conf.*.sink.statsd.port", c.port.toString)
.set("spark.metrics.conf.*.sink.statsd.period", "30")
.set("spark.metrics.conf.*.sink.statsd.unit", "seconds")
.set("spark.metrics.namespace", jobConfig.mode.toString.toLowerCase)
.set("spark.metrics.namespace", s"feast_${jobConfig.mode.toString.toLowerCase}")
// until proto parser udf will be fixed, we have to use this
.set("spark.sql.legacy.allowUntypedScalaUDF", "true")
case None => ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package feast.ingestion

import feast.ingestion.metrics.IngestionPipelineMetrics
import feast.ingestion.sources.bq.BigQueryReader
import feast.ingestion.sources.file.FileReader
import feast.ingestion.stores.deadletters.DeadLetterMetrics
import feast.ingestion.validation.{RowValidator, TypeCheck}
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -59,13 +59,16 @@ object BatchPipeline extends BasePipeline {

val projected = input.select(projection: _*).cache()

implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema)

TypeCheck.allTypesMatch(projected.schema, featureTable) match {
case Some(error) =>
throw new RuntimeException(s"Dataframe columns don't match expected feature types: $error")
case _ => ()
}

val validRows = projected
.mapPartitions(IngestionPipelineMetrics.incrementRead)
.filter(rowValidator.allChecks)

validRows.write
Expand All @@ -77,12 +80,11 @@ object BatchPipeline extends BasePipeline {
.option("max_age", config.featureTable.maxAge.getOrElse(0L))
.save()

implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema)
config.deadLetterPath match {
case Some(path) =>
projected
.filter(!rowValidator.allChecks)
.mapPartitions(iter => DeadLetterMetrics.incrementCount(iter))
.mapPartitions(IngestionPipelineMetrics.incrementDeadletters)
.write
.format("parquet")
.mode(SaveMode.Append)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package feast.ingestion
import java.io.File
import java.util.concurrent.TimeUnit

import feast.ingestion.metrics.IngestionPipelineMetrics
import feast.ingestion.registry.proto.ProtoRegistryFactory
import org.apache.spark.sql.{DataFrame, Encoder, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{expr, struct, udf}
import feast.ingestion.stores.deadletters.DeadLetterMetrics
import feast.ingestion.utils.ProtoReflection
import feast.ingestion.utils.testing.MemoryStreamingSource
import feast.ingestion.validation.{RowValidator, TypeCheck}
Expand Down Expand Up @@ -104,8 +104,10 @@ object StreamingPipeline extends BasePipeline with Serializable {
batchDF.withColumn("_isValid", rowValidator.allChecks)
}
rowsAfterValidation.persist()
implicit def rowEncoder: Encoder[Row] = RowEncoder(rowsAfterValidation.schema)

rowsAfterValidation
.mapPartitions(IngestionPipelineMetrics.incrementRead)
.filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks)
.write
.format("feast.ingestion.stores.redis")
Expand All @@ -116,13 +118,12 @@ object StreamingPipeline extends BasePipeline with Serializable {
.option("max_age", config.featureTable.maxAge.getOrElse(0L))
.save()

implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema)
config.deadLetterPath match {
case Some(path) =>

rowsAfterValidation
.filter("!_isValid")
.mapPartitions(iter => DeadLetterMetrics.incrementCount(iter))
.mapPartitions(IngestionPipelineMetrics.incrementDeadletters)
.write
.format("parquet")
.mode(SaveMode.Append)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.metrics

import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.IngestionPipelineMetricSource

object IngestionPipelineMetrics {
def incrementDeadletters[A](rowIterator: Iterator[A]): Iterator[A] = {
if (metricSource.nonEmpty)
metricSource.get.METRIC_DEADLETTER_ROWS_INSERTED.inc(rowIterator.length)

rowIterator
}

def incrementRead[A](rowIterator: Iterator[A]): Iterator[A] = {
if (metricSource.nonEmpty)
metricSource.get.METRIC_ROWS_READ_FROM_SOURCE.inc(rowIterator.length)

rowIterator
}

private lazy val metricSource: Option[IngestionPipelineMetricSource] = {
this.synchronized {
if (
SparkEnv.get.metricsSystem
.getSourcesByName(IngestionPipelineMetricSource.sourceName)
.isEmpty
) {
SparkEnv.get.metricsSystem.registerSource(new IngestionPipelineMetricSource)
}
}

SparkEnv.get.metricsSystem.getSourcesByName(IngestionPipelineMetricSource.sourceName) match {
case Seq(head) => Some(head.asInstanceOf[IngestionPipelineMetricSource])
case _ => None
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.metrics.source

class DeadLetterSinkMetricSource extends BaseMetricSource {
override val sourceName: String = DeadLetterSinkMetricSource.sourceName
class IngestionPipelineMetricSource extends BaseMetricSource {
override val sourceName: String = IngestionPipelineMetricSource.sourceName

val METRIC_DEADLETTER_ROWS_INSERTED =
metricRegistry.counter(counterWithLabels("feast_ingestion_deadletter_count"))
metricRegistry.counter(counterWithLabels("deadletter_count"))

val METRIC_ROWS_READ_FROM_SOURCE =
metricRegistry.counter(counterWithLabels("read_from_source_count"))
}

object DeadLetterSinkMetricSource {
val sourceName = "deadletter_sink"
object IngestionPipelineMetricSource {
val sourceName = "ingestion_pipeline"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class RedisSinkMetricSource extends BaseMetricSource {
override val sourceName: String = RedisSinkMetricSource.sourceName

val METRIC_TOTAL_ROWS_INSERTED =
metricRegistry.counter(counterWithLabels("feast_ingestion_feature_row_ingested_count"))
metricRegistry.counter(counterWithLabels("feature_row_ingested_count"))

val METRIC_ROWS_LAG =
metricRegistry.histogram(metricWithLabels("feast_ingestion_feature_row_lag_ms"))
metricRegistry.histogram(metricWithLabels("feature_row_lag_ms"))
}

object RedisSinkMetricSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest._
import redis.clients.jedis.Jedis
import feast.ingestion.helpers.RedisStorageHelper._
import feast.ingestion.helpers.DataHelper._
import feast.ingestion.metrics.StatsDStub
import feast.proto.storage.RedisProto.RedisKeyV2
import feast.proto.types.ValueProto
import org.apache.spark.sql.Encoder
Expand All @@ -55,6 +56,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
jedis.flushAll()

implicit def testRowEncoder: Encoder[TestRow] = ExpressionEncoder()
val statsDStub = new StatsDStub

def rowGenerator(start: DateTime, end: DateTime, customerGen: Option[Gen[String]] = None) =
for {
Expand Down Expand Up @@ -95,7 +97,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
)
),
startTime = DateTime.parse("2020-08-01"),
endTime = DateTime.parse("2020-09-01")
endTime = DateTime.parse("2020-09-01"),
metrics = Some(StatsDConfig(host="localhost", port=statsDStub.port))
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package feast.ingestion.metrics

import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException}

import scala.collection.mutable.ArrayBuffer

class StatsDStub {
val socket = new DatagramSocket()
socket.setSoTimeout(100)

def port: Int = socket.getLocalPort

def receive: Array[String] = {
val messages: ArrayBuffer[String] = ArrayBuffer()
var finished = false

do {
val buf = new Array[Byte](65535)
val p = new DatagramPacket(buf, buf.length)
try {
socket.receive(p)
} catch {
case _: SocketTimeoutException =>
finished = true
}
messages += new String(p.getData, 0, p.getLength)
} while (!finished)

messages.toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,17 @@
*/
package feast.ingestion.metrics

import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException}
import java.util
import java.util.Collections

import com.codahale.metrics.{Gauge, Histogram, MetricRegistry, UniformReservoir}
import feast.ingestion.UnitSpec

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

class StatsReporterSpec extends UnitSpec {
class SimpleServer {
val socket = new DatagramSocket()
socket.setSoTimeout(100)

def port: Int = socket.getLocalPort

def receive: Array[String] = {
val messages: ArrayBuffer[String] = ArrayBuffer()
var finished = false

do {
val buf = new Array[Byte](65535)
val p = new DatagramPacket(buf, buf.length)
try {
socket.receive(p)
} catch {
case _: SocketTimeoutException => {
finished = true
}
}
messages += new String(p.getData, 0, p.getLength)
} while (!finished)

messages.toArray
}
}

trait Scope {
val server = new SimpleServer
val server = new StatsDStub
val reporter = new StatsdReporterWithTags(
new MetricRegistry,
"127.0.0.1",
Expand Down

0 comments on commit 158a887

Please sign in to comment.