Skip to content

Commit

Permalink
lazy metric registering & metric name filtering (#1195)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <[email protected]>
  • Loading branch information
pyalex authored Nov 24, 2020
1 parent 60b568d commit 542e7d7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ trait BasePipeline {
case Some(c: StatsDConfig) =>
conf
.set(
"spark.metrics.conf.*.source.redis.class",
"org.apache.spark.metrics.source.RedisSinkMetricSource"
)
.set(
"spark.metrics.conf.*.source.redis.labels",
"spark.metrics.labels",
s"feature_table=${jobConfig.featureTable.name}"
)
.set(
Expand All @@ -56,7 +52,7 @@ trait BasePipeline {
.set("spark.metrics.conf.*.sink.statsd.port", c.port.toString)
.set("spark.metrics.conf.*.sink.statsd.period", "30")
.set("spark.metrics.conf.*.sink.statsd.unit", "seconds")
.set("spark.metrics.namespace", jobConfig.mode.toString)
.set("spark.metrics.namespace", jobConfig.mode.toString.toLowerCase)
case None => ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,28 @@ class StatsdReporterWithTags(
reportMetered(name, timer)
}

private val nameWithTag = """(\S+)#(\S+)""".r

private def send(name: String, value: String, metricType: String)(implicit
socket: DatagramSocket
): Unit = {
val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
val bytes = name match {
case nameWithTag(name, tags) =>
val tagsWithSemicolon = tags.replace('=', ':')
sanitize(s"$name:$value|$metricType|#$tagsWithSemicolon").getBytes(UTF_8)
case _ =>
sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
}
val packet = new DatagramPacket(bytes, bytes.length, address)
socket.send(packet)
}

private val nameWithTag = """(\S+)#(\S+)""".r

private def fullName(name: String, suffixes: String*): String = name match {
case nameWithTag(name, tags) =>
MetricRegistry.name(prefix, name +: suffixes: _*) ++ "#" ++ tags
// filter out parts that consists only from numbers
// that could be executor-id for example
val stableName = name.split('.').filterNot(_ forall Character.isDigit).mkString(".")
MetricRegistry.name(prefix, stableName +: suffixes: _*) ++ "#" ++ tags
case _ =>
MetricRegistry.name(prefix, name +: suffixes: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
extends BaseRelation
with InsertableRelation
with Serializable {

import RedisSinkRelation._

private implicit val redisConfig: RedisConfig = {
new RedisConfig(
new RedisEndpoint(sqlContext.sparkContext.getConf)
Expand Down Expand Up @@ -151,11 +154,21 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
.build
}

private lazy val metricSource: Option[RedisSinkMetricSource] =
private lazy val metricSource: Option[RedisSinkMetricSource] = {
MetricInitializationLock.synchronized {
// RedisSinkMetricSource needs to be registered on executor and SparkEnv must already exist.
// Which is problematic, since metrics system is initialized before SparkEnv set.
// That's why I moved source registering here
if (SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName).isEmpty) {
SparkEnv.get.metricsSystem.registerSource(new RedisSinkMetricSource)
}
}

SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName) match {
case Seq(head) => Some(head.asInstanceOf[RedisSinkMetricSource])
case _ => None
}
}

private def groupKeysByNode(
nodes: Array[RedisNode],
Expand Down Expand Up @@ -202,3 +215,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

}
}

object RedisSinkRelation {
object MetricInitializationLock
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@
package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.SparkEnv

class RedisSinkMetricSource extends Source {
override val sourceName: String = RedisSinkMetricSource.sourceName

override val metricRegistry: MetricRegistry = new MetricRegistry

private val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true))
private val sparkConfig = SparkEnv.get.conf

private val metricLabels = sparkConfig.get("spark.metrics.conf.*.source.redis.labels")
private val metricLabels = sparkConfig.get("spark.metrics.labels", "")

private def nameWithLabels(name: String) =
private val appId = sparkConfig.get("spark.app.id", "")

private val executorId = sparkConfig.get("spark.executor.id", "")

private def nameWithLabels(name: String) = {
if (metricLabels.isEmpty) {
name
} else {
s"$name#$metricLabels"
s"$name#$metricLabels,job_id=$appId-$executorId"
}
}

val METRIC_TOTAL_ROWS_INSERTED =
metricRegistry.counter(nameWithLabels("feast_ingestion_feature_row_ingested_count"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,19 @@ class StatsReporterSpec extends UnitSpec {
server.receive should contain("test:0|g")
}

"Statsd reporter" should "keep tags part in the name's end" in new Scope {
"Statsd reporter" should "keep tags part in the message's end" in new Scope {
reporter.report(
gauges = Collections.emptySortedMap(),
counters = Collections.emptySortedMap(),
histograms = new util.TreeMap(
Map(
"test#fs=name" -> histogram((1 to 100))
"prefix.1111.test#fs=name,job=aaa" -> histogram((1 to 100))
).asJava
),
meters = Collections.emptySortedMap(),
timers = Collections.emptySortedMap()
)

server.receive should contain("test.p95#fs=name:95.95|ms")
server.receive should contain("prefix.test.p95:95.95|ms|#fs:name,job:aaa")
}
}

0 comments on commit 542e7d7

Please sign in to comment.