Skip to content

Commit

Permalink
atlas cloudwatch: Rework publish routing config. (#441)
Browse files Browse the repository at this point in the history
This allows us to route by region as well as account. Necessary for the
IEP stacks and maybe others in the future.
  • Loading branch information
manolama authored Apr 5, 2023
1 parent 1e8461f commit 7288381
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.netflix.atlas.cloudwatch

import akka.actor.ActorSystem
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.cloudwatch.PublishRouter.defaultKey
import com.netflix.iep.config.NetflixEnvironment
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
Expand All @@ -42,29 +44,70 @@ class PublishRouter(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
"main",
baseURI.replaceAll("\\$\\{STACK\\}", "main"),
baseURI
.replaceAll("\\$\\{STACK\\}", "main")
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
httpClient,
schedulers
)

private[cloudwatch] val accountMap = config
.getConfig("atlas.cloudwatch.account.routing.stackMap")
.entrySet()
.asScala
.flatMap { e =>
e.getValue.unwrapped().asInstanceOf[java.util.List[String]].asScala.map { acct =>
val queue = new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
e.getKey,
baseURI.replaceAll("\\$\\{STACK\\}", e.getKey),
httpClient,
schedulers
)
acct -> queue
// acct, region, queue
private[cloudwatch] val accountMap: Map[String, Map[String, PublishQueue]] = {
var accounts = Map.empty[String, Map[String, PublishQueue]]
config
.getConfigList("atlas.cloudwatch.account.routing.routes")
.asScala
.foreach { cfg =>
val stack = cfg.getString("stack")
cfg
.getConfigList("accounts")
.asScala
.foreach { c =>
val account = c.getString("account")
if (accounts.contains(account)) {
throw new IllegalArgumentException(
s"Account ${account} can only appear once in the config."
)
}
// region, queue
var routes = Map.empty[String, PublishQueue]
if (c.hasPath("routing")) {
routes = c
.getConfig("routing")
.entrySet()
.asScala
.map { r =>
val destination = r.getValue.unwrapped().toString
r.getKey -> new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
destination,
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", destination),
httpClient,
schedulers
)
}
.toMap
}

routes += defaultKey -> new PublishQueue(
config.getConfig("atlas.cloudwatch.account.routing"),
registry,
NetflixEnvironment.region(),
baseURI
.replaceAll("\\$\\{STACK\\}", stack)
.replaceAll("\\$\\{REGION}", NetflixEnvironment.region()),
httpClient,
schedulers
)

accounts += account -> routes
}
}
}
.toMap
accounts
}
logger.info(s"Loaded ${accountMap.size} accounts plus main.")

/**
Expand All @@ -75,18 +118,33 @@ class PublishRouter(
*/
def publish(datapoint: AtlasDatapoint): Unit = {
val formatted = tagger.fixTags(datapoint)
formatted.tags.get("nf.account") match {
getQueue(formatted) match {
case Some(queue) => queue.enqueue(formatted)
case None => missingAccount.increment()
}
}

private[cloudwatch] def getQueue(datapoint: AtlasDatapoint): Option[PublishQueue] = {
datapoint.tags.get("nf.account") match {
case Some(account) =>
accountMap.get(account) match {
case Some(queue) => queue.enqueue(formatted)
case None => mainQueue.enqueue(formatted)
case Some(regionMap) =>
val region = datapoint.tags.get("nf.region").getOrElse(defaultKey)
regionMap.get(region) match {
case Some(queue) => Some(queue)
case None => regionMap.get(defaultKey)
}
case None => Some(mainQueue)
}
case None =>
missingAccount.increment()
case None => None
}
}

def shutdown(): Unit = {
def shutdown: Unit = {
schedulers.shutdownNow()
}
}

object PublishRouter {
private[cloudwatch] val defaultKey = "_DEFAULT"
}
35 changes: 25 additions & 10 deletions atlas-cloudwatch/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,32 @@ atlas {
]
}
routing {
uri = "https://publish-${STACK}.foo.com/api/v1/publish"
uri = "https://publish-${STACK}.${REGION}.foo.com/api/v1/publish"
default = "main"
stackMap = {
stackA = [
"1",
"2"
],
stackB = [
"3"
]
}
routes = [
{
stack = "stackA"
accounts = [
{
account = "1"
routing = {
"us-west-1" = "us-west-1"
}
},
{
account = "2"
}
]
},
{
stack = "stackB"
accounts = [
{
account = "3"
}
]
}
]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,78 @@ package com.netflix.atlas.cloudwatch
import akka.actor.ActorSystem
import akka.testkit.TestKitBase
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.cloudwatch.CloudWatchMetricsProcessorSuite.timestamp
import com.netflix.atlas.core.model.Datapoint
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.Registry
import com.typesafe.config.ConfigFactory
import munit.FunSuite
import org.mockito.MockitoSugar.mock

import scala.jdk.CollectionConverters.CollectionHasAsScala

class PublishRouterSuite extends FunSuite with TestKitBase {

override implicit def system: ActorSystem = ActorSystem(getClass.getSimpleName)

var registry: Registry = null
var registry: Registry = new DefaultRegistry()
val config = ConfigFactory.load()
val tagger = new NetflixTagger(config.getConfig("atlas.cloudwatch.tagger"))
val httpClient = mock[AkkaHttpClient]

override def beforeEach(context: BeforeEach): Unit = {
registry = new DefaultRegistry()
}
val router = new PublishRouter(config, registry, tagger, httpClient)

test("initialize") {
val router = new PublishRouter(config, registry, tagger, httpClient)
assertEquals(router.mainQueue.uri, "https://publish-main.foo.com/api/v1/publish")
assertEquals(router.mainQueue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
assertEquals(router.accountMap.size, 3)
assertEquals(
router.accountMap.get("1").get.uri,
"https://publish-stackA.foo.com/api/v1/publish"
)
assertEquals(
router.accountMap.get("2").get.uri,
"https://publish-stackA.foo.com/api/v1/publish"
)
assertEquals(
router.accountMap.get("3").get.uri,
"https://publish-stackB.foo.com/api/v1/publish"
)
router.shutdown()
router.shutdown
}

// NOTE: These are flaky. Either we need a queue factory that we can inject or figure out some way to pause the
// Akka system. For now, routing is simple enough I'm not worried about testing.
// test("publish main") {
// val dp = Datapoint(Map("nf.account" -> "42"), timestamp, 42.0)
// val router = spy(new PublishRouter(config, registry, tagger, httpClient))
// router.shutdown()
// router.publish(dp)
// assertEquals(router.mainQueue.publishQueue.size, 1)
// }
//
// test("publish stackA") {
// val dp = Datapoint(Map("nf.account" -> "1"), timestamp, 42.0)
// val router = new PublishRouter(config, registry, tagger, httpClient)
// router.shutdown()
// router.publish(dp)
// assertEquals(router.accountMap.get("1").get.publishQueue.size, 1)
// }
test("publish main same region") {
val dp = Datapoint(Map("nf.account" -> "42", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish main same any region") {
val dp = Datapoint(Map("nf.account" -> "42", "nf.region" -> "ap-south-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-main.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA acct 1 default") {
val dp = Datapoint(Map("nf.account" -> "1", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA acct 2 default") {
val dp = Datapoint(Map("nf.account" -> "2", "nf.region" -> "us-east-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackA us-west") {
val dp = Datapoint(Map("nf.account" -> "1", "nf.region" -> "us-west-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackA.us-west-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish stackB us-west") {
val dp = Datapoint(Map("nf.account" -> "3", "nf.region" -> "us-west-1"), timestamp, 42.0)
val queue = router.getQueue(dp).get
assertEquals(queue.uri, "https://publish-stackB.us-east-1.foo.com/api/v1/publish")
router.shutdown
}

test("publish missing account tag") {
val dp = Datapoint(Map("no" -> "account"), 1677628800000L, 42.0)
val router = new PublishRouter(config, registry, tagger, httpClient)
router.shutdown()
router.shutdown
router.publish(dp)
assertEquals(
registry.counter("atlas.cloudwatch.queue.dps.dropped", "reason", "missingAccount").count(),
Expand Down

0 comments on commit 7288381

Please sign in to comment.