From 542e7d7db8af28e93dc2b34cc3a128f0f19d8fe9 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 24 Nov 2020 15:39:54 +0800 Subject: [PATCH] lazy metric registering & metric name filtering (#1195) Signed-off-by: Oleksii Moskalenko --- .../scala/feast/ingestion/BasePipeline.scala | 8 ++------ .../metrics/StatsdReporterWithTags.scala | 17 +++++++++++++---- .../stores/redis/RedisSinkRelation.scala | 19 ++++++++++++++++++- .../source/RedisSinkMetricSource.scala | 15 ++++++++++----- .../ingestion/metrics/StatsReporterSpec.scala | 6 +++--- 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala index cc1f451ae7..679785bc2a 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -41,11 +41,7 @@ trait BasePipeline { case Some(c: StatsDConfig) => conf .set( - "spark.metrics.conf.*.source.redis.class", - "org.apache.spark.metrics.source.RedisSinkMetricSource" - ) - .set( - "spark.metrics.conf.*.source.redis.labels", + "spark.metrics.labels", s"feature_table=${jobConfig.featureTable.name}" ) .set( @@ -56,7 +52,7 @@ trait BasePipeline { .set("spark.metrics.conf.*.sink.statsd.port", c.port.toString) .set("spark.metrics.conf.*.sink.statsd.period", "30") .set("spark.metrics.conf.*.sink.statsd.unit", "seconds") - .set("spark.metrics.namespace", jobConfig.mode.toString) + .set("spark.metrics.namespace", jobConfig.mode.toString.toLowerCase) case None => () } diff --git a/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala index 66b48dd444..894014b6fd 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala @@ -169,19 +169,28 @@ class StatsdReporterWithTags( reportMetered(name, timer) } + private val nameWithTag = """(\S+)#(\S+)""".r + private def send(name: String, value: String, metricType: String)(implicit socket: DatagramSocket ): Unit = { - val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8) + val bytes = name match { + case nameWithTag(name, tags) => + val tagsWithSemicolon = tags.replace('=', ':') + sanitize(s"$name:$value|$metricType|#$tagsWithSemicolon").getBytes(UTF_8) + case _ => + sanitize(s"$name:$value|$metricType").getBytes(UTF_8) + } val packet = new DatagramPacket(bytes, bytes.length, address) socket.send(packet) } - private val nameWithTag = """(\S+)#(\S+)""".r - private def fullName(name: String, suffixes: String*): String = name match { case nameWithTag(name, tags) => - MetricRegistry.name(prefix, name +: suffixes: _*) ++ "#" ++ tags + // filter out parts that consists only from numbers + // that could be executor-id for example + val stableName = name.split('.').filterNot(_ forall Character.isDigit).mkString(".") + MetricRegistry.name(prefix, stableName +: suffixes: _*) ++ "#" ++ tags case _ => MetricRegistry.name(prefix, name +: suffixes: _*) } diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala index 9be82fbc33..f70af5d7e5 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala @@ -48,6 +48,9 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC extends BaseRelation with InsertableRelation with Serializable { + + import RedisSinkRelation._ + private implicit val redisConfig: RedisConfig = { new RedisConfig( new RedisEndpoint(sqlContext.sparkContext.getConf) @@ -151,11 +154,21 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC .build } - private lazy val metricSource: Option[RedisSinkMetricSource] = + private lazy val metricSource: Option[RedisSinkMetricSource] = { + MetricInitializationLock.synchronized { + // RedisSinkMetricSource needs to be registered on executor and SparkEnv must already exist. + // Which is problematic, since metrics system is initialized before SparkEnv set. + // That's why I moved source registering here + if (SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName).isEmpty) { + SparkEnv.get.metricsSystem.registerSource(new RedisSinkMetricSource) + } + } + SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName) match { case Seq(head) => Some(head.asInstanceOf[RedisSinkMetricSource]) case _ => None } + } private def groupKeysByNode( nodes: Array[RedisNode], @@ -202,3 +215,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC } } + +object RedisSinkRelation { + object MetricInitializationLock +} diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala index 77c9218a7e..bc4747828e 100644 --- a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala @@ -17,23 +17,28 @@ package org.apache.spark.metrics.source import com.codahale.metrics.MetricRegistry -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.SparkEnv class RedisSinkMetricSource extends Source { override val sourceName: String = RedisSinkMetricSource.sourceName override val metricRegistry: MetricRegistry = new MetricRegistry - private val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true)) + private val sparkConfig = SparkEnv.get.conf - private val metricLabels = sparkConfig.get("spark.metrics.conf.*.source.redis.labels") + private val metricLabels = sparkConfig.get("spark.metrics.labels", "") - private def nameWithLabels(name: String) = + private val appId = sparkConfig.get("spark.app.id", "") + + private val executorId = sparkConfig.get("spark.executor.id", "") + + private def nameWithLabels(name: String) = { if (metricLabels.isEmpty) { name } else { - s"$name#$metricLabels" + s"$name#$metricLabels,job_id=$appId-$executorId" } + } val METRIC_TOTAL_ROWS_INSERTED = metricRegistry.counter(nameWithLabels("feast_ingestion_feature_row_ingested_count")) diff --git a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala index 1ae61724ed..3b674de8b7 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala @@ -89,19 +89,19 @@ class StatsReporterSpec extends UnitSpec { server.receive should contain("test:0|g") } - "Statsd reporter" should "keep tags part in the name's end" in new Scope { + "Statsd reporter" should "keep tags part in the message's end" in new Scope { reporter.report( gauges = Collections.emptySortedMap(), counters = Collections.emptySortedMap(), histograms = new util.TreeMap( Map( - "test#fs=name" -> histogram((1 to 100)) + "prefix.1111.test#fs=name,job=aaa" -> histogram((1 to 100)) ).asJava ), meters = Collections.emptySortedMap(), timers = Collections.emptySortedMap() ) - server.receive should contain("test.p95#fs=name:95.95|ms") + server.receive should contain("prefix.test.p95:95.95|ms|#fs:name,job:aaa") } }