From 158a8878c38b83afe692d32e292ee11f16a8e19b Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 23 Dec 2020 16:47:33 +0800 Subject: [PATCH] generalize ingestion pipeline metrics Signed-off-by: Oleksii Moskalenko --- .../scala/feast/ingestion/BasePipeline.scala | 2 +- .../scala/feast/ingestion/BatchPipeline.scala | 8 +-- .../feast/ingestion/StreamingPipeline.scala | 7 +-- .../metrics/IngestionPipelineMetrics.scala | 53 +++++++++++++++++++ .../deadletters/DeadLetterMetrics.scala | 47 ---------------- ...la => IngestionPipelineMetricSource.scala} | 13 +++-- .../source/RedisSinkMetricSource.scala | 4 +- .../feast/ingestion/BatchPipelineIT.scala | 5 +- .../feast/ingestion/metrics/StatsDStub.scala | 31 +++++++++++ .../ingestion/metrics/StatsReporterSpec.scala | 31 +---------- 10 files changed, 109 insertions(+), 92 deletions(-) create mode 100644 spark/ingestion/src/main/scala/feast/ingestion/metrics/IngestionPipelineMetrics.scala delete mode 100644 spark/ingestion/src/main/scala/feast/ingestion/stores/deadletters/DeadLetterMetrics.scala rename spark/ingestion/src/main/scala/org/apache/spark/metrics/source/{DeadLetterSinkMetricSource.scala => IngestionPipelineMetricSource.scala} (64%) create mode 100644 spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsDStub.scala diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala index e1f5a59285..13dd51f280 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -55,7 +55,7 @@ trait BasePipeline { .set("spark.metrics.conf.*.sink.statsd.port", c.port.toString) .set("spark.metrics.conf.*.sink.statsd.period", "30") .set("spark.metrics.conf.*.sink.statsd.unit", "seconds") - .set("spark.metrics.namespace", jobConfig.mode.toString.toLowerCase) + .set("spark.metrics.namespace", s"feast_${jobConfig.mode.toString.toLowerCase}") // until proto parser udf will be fixed, we have to use this .set("spark.sql.legacy.allowUntypedScalaUDF", "true") case None => () diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala index c79a50952e..3b8675f8ec 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala @@ -16,9 +16,9 @@ */ package feast.ingestion +import feast.ingestion.metrics.IngestionPipelineMetrics import feast.ingestion.sources.bq.BigQueryReader import feast.ingestion.sources.file.FileReader -import feast.ingestion.stores.deadletters.DeadLetterMetrics import feast.ingestion.validation.{RowValidator, TypeCheck} import org.apache.commons.lang.StringUtils import org.apache.spark.SparkEnv @@ -59,6 +59,8 @@ object BatchPipeline extends BasePipeline { val projected = input.select(projection: _*).cache() + implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema) + TypeCheck.allTypesMatch(projected.schema, featureTable) match { case Some(error) => throw new RuntimeException(s"Dataframe columns don't match expected feature types: $error") @@ -66,6 +68,7 @@ object BatchPipeline extends BasePipeline { } val validRows = projected + .mapPartitions(IngestionPipelineMetrics.incrementRead) .filter(rowValidator.allChecks) validRows.write @@ -77,12 +80,11 @@ object BatchPipeline extends BasePipeline { .option("max_age", config.featureTable.maxAge.getOrElse(0L)) .save() - implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema) config.deadLetterPath match { case Some(path) => projected .filter(!rowValidator.allChecks) - .mapPartitions(iter => DeadLetterMetrics.incrementCount(iter)) + .mapPartitions(IngestionPipelineMetrics.incrementDeadletters) .write .format("parquet") .mode(SaveMode.Append) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index 9f88cf96cf..c6ed87df85 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -19,10 +19,10 @@ package feast.ingestion import java.io.File import java.util.concurrent.TimeUnit +import feast.ingestion.metrics.IngestionPipelineMetrics import feast.ingestion.registry.proto.ProtoRegistryFactory import org.apache.spark.sql.{DataFrame, Encoder, Row, SaveMode, SparkSession} import org.apache.spark.sql.functions.{expr, struct, udf} -import feast.ingestion.stores.deadletters.DeadLetterMetrics import feast.ingestion.utils.ProtoReflection import feast.ingestion.utils.testing.MemoryStreamingSource import feast.ingestion.validation.{RowValidator, TypeCheck} @@ -104,8 +104,10 @@ object StreamingPipeline extends BasePipeline with Serializable { batchDF.withColumn("_isValid", rowValidator.allChecks) } rowsAfterValidation.persist() + implicit def rowEncoder: Encoder[Row] = RowEncoder(rowsAfterValidation.schema) rowsAfterValidation + .mapPartitions(IngestionPipelineMetrics.incrementRead) .filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks) .write .format("feast.ingestion.stores.redis") @@ -116,13 +118,12 @@ object StreamingPipeline extends BasePipeline with Serializable { .option("max_age", config.featureTable.maxAge.getOrElse(0L)) .save() - implicit def rowEncoder: Encoder[Row] = RowEncoder(projected.schema) config.deadLetterPath match { case Some(path) => rowsAfterValidation .filter("!_isValid") - .mapPartitions(iter => DeadLetterMetrics.incrementCount(iter)) + .mapPartitions(IngestionPipelineMetrics.incrementDeadletters) .write .format("parquet") .mode(SaveMode.Append) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/metrics/IngestionPipelineMetrics.scala b/spark/ingestion/src/main/scala/feast/ingestion/metrics/IngestionPipelineMetrics.scala new file mode 100644 index 0000000000..060547e611 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/metrics/IngestionPipelineMetrics.scala @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.metrics + +import org.apache.spark.SparkEnv +import org.apache.spark.metrics.source.IngestionPipelineMetricSource + +object IngestionPipelineMetrics { + def incrementDeadletters[A](rowIterator: Iterator[A]): Iterator[A] = { + if (metricSource.nonEmpty) + metricSource.get.METRIC_DEADLETTER_ROWS_INSERTED.inc(rowIterator.length) + + rowIterator + } + + def incrementRead[A](rowIterator: Iterator[A]): Iterator[A] = { + if (metricSource.nonEmpty) + metricSource.get.METRIC_ROWS_READ_FROM_SOURCE.inc(rowIterator.length) + + rowIterator + } + + private lazy val metricSource: Option[IngestionPipelineMetricSource] = { + this.synchronized { + if ( + SparkEnv.get.metricsSystem + .getSourcesByName(IngestionPipelineMetricSource.sourceName) + .isEmpty + ) { + SparkEnv.get.metricsSystem.registerSource(new IngestionPipelineMetricSource) + } + } + + SparkEnv.get.metricsSystem.getSourcesByName(IngestionPipelineMetricSource.sourceName) match { + case Seq(head) => Some(head.asInstanceOf[IngestionPipelineMetricSource]) + case _ => None + } + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/deadletters/DeadLetterMetrics.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/deadletters/DeadLetterMetrics.scala deleted file mode 100644 index 4d28032e09..0000000000 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/deadletters/DeadLetterMetrics.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.ingestion.stores.deadletters - -import org.apache.spark.SparkEnv -import org.apache.spark.metrics.source.DeadLetterSinkMetricSource -import org.apache.spark.sql.Row - -object DeadLetterMetrics { - def incrementCount(rowIterator: Iterator[Row]) = { - val res = rowIterator - .map(row => { - metricSource.get.METRIC_DEADLETTER_ROWS_INSERTED.inc() - row - }) - res - } - - private lazy val metricSource: Option[DeadLetterSinkMetricSource] = { - this.synchronized { - if ( - SparkEnv.get.metricsSystem.getSourcesByName(DeadLetterSinkMetricSource.sourceName).isEmpty - ) { - SparkEnv.get.metricsSystem.registerSource(new DeadLetterSinkMetricSource) - } - } - - SparkEnv.get.metricsSystem.getSourcesByName(DeadLetterSinkMetricSource.sourceName) match { - case Seq(head) => Some(head.asInstanceOf[DeadLetterSinkMetricSource]) - case _ => None - } - } -} diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/DeadLetterSinkMetricSource.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/IngestionPipelineMetricSource.scala similarity index 64% rename from spark/ingestion/src/main/scala/org/apache/spark/metrics/source/DeadLetterSinkMetricSource.scala rename to spark/ingestion/src/main/scala/org/apache/spark/metrics/source/IngestionPipelineMetricSource.scala index 49cbeee713..6710619f4c 100644 --- a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/DeadLetterSinkMetricSource.scala +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/IngestionPipelineMetricSource.scala @@ -16,13 +16,16 @@ */ package org.apache.spark.metrics.source -class DeadLetterSinkMetricSource extends BaseMetricSource { - override val sourceName: String = DeadLetterSinkMetricSource.sourceName +class IngestionPipelineMetricSource extends BaseMetricSource { + override val sourceName: String = IngestionPipelineMetricSource.sourceName val METRIC_DEADLETTER_ROWS_INSERTED = - metricRegistry.counter(counterWithLabels("feast_ingestion_deadletter_count")) + metricRegistry.counter(counterWithLabels("deadletter_count")) + + val METRIC_ROWS_READ_FROM_SOURCE = + metricRegistry.counter(counterWithLabels("read_from_source_count")) } -object DeadLetterSinkMetricSource { - val sourceName = "deadletter_sink" +object IngestionPipelineMetricSource { + val sourceName = "ingestion_pipeline" } diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala index 09fddd892a..e5949d47bb 100644 --- a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala @@ -20,10 +20,10 @@ class RedisSinkMetricSource extends BaseMetricSource { override val sourceName: String = RedisSinkMetricSource.sourceName val METRIC_TOTAL_ROWS_INSERTED = - metricRegistry.counter(counterWithLabels("feast_ingestion_feature_row_ingested_count")) + metricRegistry.counter(counterWithLabels("feature_row_ingested_count")) val METRIC_ROWS_LAG = - metricRegistry.histogram(metricWithLabels("feast_ingestion_feature_row_lag_ms")) + metricRegistry.histogram(metricWithLabels("feature_row_lag_ms")) } object RedisSinkMetricSource { diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala index 85db649433..f3054c0423 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala @@ -30,6 +30,7 @@ import org.scalatest._ import redis.clients.jedis.Jedis import feast.ingestion.helpers.RedisStorageHelper._ import feast.ingestion.helpers.DataHelper._ +import feast.ingestion.metrics.StatsDStub import feast.proto.storage.RedisProto.RedisKeyV2 import feast.proto.types.ValueProto import org.apache.spark.sql.Encoder @@ -55,6 +56,7 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { jedis.flushAll() implicit def testRowEncoder: Encoder[TestRow] = ExpressionEncoder() + val statsDStub = new StatsDStub def rowGenerator(start: DateTime, end: DateTime, customerGen: Option[Gen[String]] = None) = for { @@ -95,7 +97,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer { ) ), startTime = DateTime.parse("2020-08-01"), - endTime = DateTime.parse("2020-09-01") + endTime = DateTime.parse("2020-09-01"), + metrics = Some(StatsDConfig(host="localhost", port=statsDStub.port)) ) } diff --git a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsDStub.scala b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsDStub.scala new file mode 100644 index 0000000000..29ddcb75d7 --- /dev/null +++ b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsDStub.scala @@ -0,0 +1,31 @@ +package feast.ingestion.metrics + +import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException} + +import scala.collection.mutable.ArrayBuffer + +class StatsDStub { + val socket = new DatagramSocket() + socket.setSoTimeout(100) + + def port: Int = socket.getLocalPort + + def receive: Array[String] = { + val messages: ArrayBuffer[String] = ArrayBuffer() + var finished = false + + do { + val buf = new Array[Byte](65535) + val p = new DatagramPacket(buf, buf.length) + try { + socket.receive(p) + } catch { + case _: SocketTimeoutException => + finished = true + } + messages += new String(p.getData, 0, p.getLength) + } while (!finished) + + messages.toArray + } +} diff --git a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala index 3b674de8b7..b531b87a40 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala @@ -16,46 +16,17 @@ */ package feast.ingestion.metrics -import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException} import java.util import java.util.Collections import com.codahale.metrics.{Gauge, Histogram, MetricRegistry, UniformReservoir} import feast.ingestion.UnitSpec -import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ class StatsReporterSpec extends UnitSpec { - class SimpleServer { - val socket = new DatagramSocket() - socket.setSoTimeout(100) - - def port: Int = socket.getLocalPort - - def receive: Array[String] = { - val messages: ArrayBuffer[String] = ArrayBuffer() - var finished = false - - do { - val buf = new Array[Byte](65535) - val p = new DatagramPacket(buf, buf.length) - try { - socket.receive(p) - } catch { - case _: SocketTimeoutException => { - finished = true - } - } - messages += new String(p.getData, 0, p.getLength) - } while (!finished) - - messages.toArray - } - } - trait Scope { - val server = new SimpleServer + val server = new StatsDStub val reporter = new StatsdReporterWithTags( new MetricRegistry, "127.0.0.1",