Skip to content

Commit

Permalink
atlas cloudwatch: Rework publish routing config. (Netflix-Skunkworks#441
Browse files Browse the repository at this point in the history
)

This allows us to route by region as well as account. Necessary for the
IEP stacks and maybe others in the future.
  • Loading branch information
manolama committed Oct 25, 2023
1 parent c12e3f9 commit 47b8312
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.netflix.atlas.cloudwatch

import akka.actor.ActorSystem
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.cloudwatch.PublishRouter.defaultKey
import com.netflix.iep.config.NetflixEnvironment
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
Expand All @@ -42,29 +44,70 @@ class PublishRouter(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
"main",
baseURI.replaceAll("\\$\\{STACK\\}", "main"),
baseURI
.replaceAll("\\$\\{STACK\\}", "main")
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
httpClient,
schedulers
)

private[cloudwatch] val accountMap = config
.getConfig("atlas.cloudwatch.account.routing.stackMap")
.entrySet()
.asScala
.flatMap { e =>
e.getValue.unwrapped().asInstanceOf[java.util.List[String]].asScala.map { acct =>
val queue = new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
e.getKey,
baseURI.replaceAll("\\$\\{STACK\\}", e.getKey),
httpClient,
schedulers
)
acct -> queue
// acct, region, queue
private[cloudwatch] val accountMap: Map[String, Map[String, PublishQueue]] = {
var accounts = Map.empty[String, Map[String, PublishQueue]]
config
.getConfigList("atlas.cloudwatch.account.routing.routes")
.asScala
.foreach { cfg =>
val stack = cfg.getString("stack")
cfg
.getConfigList("accounts")
.asScala
.foreach { c =>
val account = c.getString("account")
if (accounts.contains(account)) {
throw new IllegalArgumentException(
s"Account ${account} can only appear once in the config."
)
}
// region, queue
var routes = Map.empty[String, PublishQueue]
if (c.hasPath("routing")) {
routes = c
.getConfig("routing")
.entrySet()
.asScala
.map { r =>
val destination = r.getValue.unwrapped().toString
r.getKey -> new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
destination,
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", destination),
httpClient,
schedulers
)
}
.toMap
}

routes += defaultKey -> new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
NetflixEnvironment.region(),
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
httpClient,
schedulers
)

accounts += account -> routes
}
}
}
.toMap
accounts
}
logger.info(s"Loaded ${accountMap.size} accounts plus main.")

/**
Expand All @@ -75,18 +118,33 @@ class PublishRouter(
*/
def publish(datapoint: AtlasDatapoint): Unit = {
val formatted = tagger.fixTags(datapoint)
formatted.tags.get("nf.account") match {
getQueue(formatted) match {
case Some(queue) => queue.enqueue(formatted)
case None => missingAccount.increment()
}
}

private[cloudwatch] def getQueue(datapoint: AtlasDatapoint): Option[PublishQueue] = {
datapoint.tags.get("nf.account") match {
case Some(account) =>
accountMap.get(account) match {
case Some(queue) => queue.enqueue(formatted)
case None => mainQueue.enqueue(formatted)
case Some(regionMap) =>
val region = datapoint.tags.get("nf.region").getOrElse(defaultKey)
regionMap.get(region) match {
case Some(queue) => Some(queue)
case None => regionMap.get(defaultKey)
}
case None => Some(mainQueue)
}
case None =>
missingAccount.increment()
case None => None
}
}

def shutdown(): Unit = {
def shutdown: Unit = {
schedulers.shutdownNow()
}
}

object PublishRouter {
private[cloudwatch] val defaultKey = "_DEFAULT"
}
35 changes: 25 additions & 10 deletions atlas-cloudwatch/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,32 @@ atlas {
]
}
routing {
uri = "https://publish-${STACK}.foo.com/api/v1/publish"
uri = "https://publish-${STACK}.${REGION}.foo.com/api/v1/publish"
default = "main"
stackMap = {
stackA = [
"1",
"2"
],
stackB = [
"3"
]
}
routes = [
{
stack = "stackA"
accounts = [
{
account = "1"
routing = {
"us-west-1" = "us-west-1"
}
},
{
account = "2"
}
]
},
{
stack = "stackB"
accounts = [
{
account = "3"
}
]
}
]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,78 @@ package com.netflix.atlas.cloudwatch
import akka.actor.ActorSystem
import akka.testkit.TestKitBase
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.cloudwatch.CloudWatchMetricsProcessorSuite.timestamp
import com.netflix.atlas.core.model.Datapoint
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.Registry
import com.typesafe.config.ConfigFactory
import munit.FunSuite
import org.mockito.MockitoSugar.mock

import scala.jdk.CollectionConverters.CollectionHasAsScala

class PublishRouterSuite extends FunSuite with TestKitBase {

override implicit def system: ActorSystem = ActorSystem(getClass.getSimpleName)

var registry: Registry = null
var registry: Registry = new DefaultRegistry()
val config = ConfigFactory.load()
val tagger = new NetflixTagger(config.getConfig("atlas.cloudwatch.tagger"))
val httpClient = mock[AkkaHttpClient]

override def beforeEach(context: BeforeEach): Unit = {
registry = new DefaultRegistry()
}
val router = new PublishRouter(config, registry, tagger, httpClient)

test("initialize") {
val router = new PublishRouter(config, registry, tagger, httpClient)
assertEquals(router.mainQueue.uri, "https://publish-main.foo.com/api/v1/publish")
assertEquals(router.mainQueue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
assertEquals(router.accountMap.size, 3)
assertEquals(
router.accountMap.get("1").get.uri,
"https://publish-stackA.foo.com/api/v1/publish"
)
assertEquals(
router.accountMap.get("2").get.uri,
"https://publish-stackA.foo.com/api/v1/publish"
)
assertEquals(
router.accountMap.get("3").get.uri,
"https://publish-stackB.foo.com/api/v1/publish"
)
router.shutdown()
router.shutdown
}

// NOTE: These are flaky. Either we need a queue factory that we can inject or figure out some way to pause the
// Akka system. For now, routing is simple enough I'm not worried about testing.
// test("publish main") {
// val dp = Datapoint(Map("nf.account" -> "42"), timestamp, 42.0)
// val router = spy(new PublishRouter(config, registry, tagger, httpClient))
// router.shutdown()
// router.publish(dp)
// assertEquals(router.mainQueue.publishQueue.size, 1)
// }
//
// test("publish stackA") {
// val dp = Datapoint(Map("nf.account" -> "1"), timestamp, 42.0)
// val router = new PublishRouter(config, registry, tagger, httpClient)
// router.shutdown()
// router.publish(dp)
// assertEquals(router.accountMap.get("1").get.publishQueue.size, 1)
// }
test("publish main same region") {
val dp = Datapoint(Map("nf.account" -> "42", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish main same any region") {
val dp = Datapoint(Map("nf.account" -> "42", "nf.region" -> "ap-south-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA acct 1 default") {
val dp = Datapoint(Map("nf.account" -> "1", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA acct 2 default") {
val dp = Datapoint(Map("nf.account" -> "2", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA us-west") {
val dp = Datapoint(Map("nf.account" -> "1", "nf.region" -> "us-west-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-west-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackB us-west") {
val dp = Datapoint(Map("nf.account" -> "3", "nf.region" -> "us-west-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackB.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish missing account tag") {
val dp = Datapoint(Map("no" -> "account"), 1677628800000L, 42.0)
val router = new PublishRouter(config, registry, tagger, httpClient)
router.shutdown()
router.shutdown
router.publish(dp)
assertEquals(
registry.counter("atlas.cloudwatch.queue.dps.dropped", "reason", "missingAccount").count(),
Expand Down

0 comments on commit 47b8312

Please sign in to comment.