Skip to content

Commit

Permalink
atlas cloudwatch: Remove actors and settings. (#439)
Browse files Browse the repository at this point in the history
Switching to the new poller.
And fix ordering for the poller of resetting the run flag to
fix flaky tests.
  • Loading branch information
manolama authored Mar 31, 2023
1 parent a0839d8 commit 7994737
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 1,042 deletions.
69 changes: 0 additions & 69 deletions atlas-cloudwatch/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,41 +1,3 @@

akka.actor.deployment {
/poller/poller-cloudwatch/metrics-get {
router = "round-robin-pool"
nr-of-instances = 100
pool-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 100
core-pool-size-max = 100
core-pool-size-factor = 1.0
}
}
}

/poller/poller-cloudwatch/metrics-list {
router = "round-robin-pool"
nr-of-instances = 2
pool-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 2
core-pool-size-factor = 1.0
}
}
}
}

atlas.akka {
actors = ${?atlas.akka.actors} [
{
name = "poller"
class = "com.netflix.atlas.poller.PollerManager"
}
]
}

# A pool used for the blocking Redis Cluster processor calls. Important to avoid
# tying up the main Akka thread pool.
redis-io-dispatcher {
Expand Down Expand Up @@ -73,37 +35,6 @@ atlas {
// How often to collect data from the pollers.
frequency = 1 m

sink = {
class = "com.netflix.atlas.poller.ClientActor"

// URL for sending the data
uri = "http://localhost:7101/api/v1/publish"

// Maximum number of datapoints to send in a single HTTP request to the publish
// endpoint.
batch-size = 10000

// If set to false, then ClientActor will work in a fire and forget mode with no
// explicit response message. Setting to true it will send an ack message to indicate
// it is done processing the message.
send-ack = false

// Characters that are allowed in metric names
valid-tag-characters = "-._A-Za-z0-9"

// Overrides to be more permissive for the values of some keys
valid-tag-value-characters = ${?atlas.poller.valid-tag-value-characters} [
{
key = "nf.cluster"
value = "-._A-Za-z0-9^~"
},
{
key = "nf.asg"
value = "-._A-Za-z0-9^~"
}
]
}

pollers = ${?atlas.poller.pollers} [
{
class = "com.netflix.atlas.cloudwatch.CloudWatchPoller"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ class CloudWatchPoller(
pollTime.record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS)
processor.updateLastSuccessfulPoll(nextRun)
threadPool.shutdown()
fullRunUt.map(_.success(runners.result()))
flag.set(false)
fullRunUt.map(_.success(runners.result()))
case Failure(ex) =>
logger.error(
"Failure at some point in polling for CloudWatch data." +
Expand All @@ -217,8 +217,8 @@ class CloudWatchPoller(
)
pollTime.record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS)
threadPool.shutdown()
fullRunUt.map(_.failure(ex))
flag.set(false)
fullRunUt.map(_.failure(ex))
}
accountsUt.map(_.success(Done))
} catch {
Expand All @@ -228,9 +228,9 @@ class CloudWatchPoller(
.counter(errorSetup.withTags("exception", ex.getClass.getSimpleName))
.increment()
threadPool.shutdown()
flag.set(false)
accountsUt.map(_.failure(ex))
fullRunUt.map(_.failure(ex))
flag.set(false)
}

case Failure(ex) =>
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import akka.util.ByteString
import com.netflix.atlas.akka.AkkaHttpClient
import com.netflix.atlas.akka.CustomMediaTypes
import com.netflix.atlas.akka.StreamOps
import com.netflix.atlas.core.model.Datapoint
import com.netflix.atlas.json.Json
import com.netflix.atlas.poller.Messages
import com.netflix.atlas.poller.Messages.MetricsPayload
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
Expand Down Expand Up @@ -149,7 +148,7 @@ class PublishQueue(
response.entity.dataBytes.runReduce(_ ++ _).onComplete {
case Success(bs) =>
try {
val msg = Json.decode[Messages.FailureResponse](bs.toArray)
val msg = Json.decode[FailureResponse](bs.toArray)
msg.message.headOption.foreach { reason =>
logger.warn("failed to validate some datapoints, first reason: {}", reason)
}
Expand Down Expand Up @@ -193,3 +192,28 @@ class PublishQueue(
retryAttempts.increment()
}
}

/**
* Represents a failure response message from the publish endpoint.
*
* @param `type`
* Message type. Should always be "error".
* @param errorCount
* Number of datapoints that failed validation.
* @param message
* Reasons for why datapoints were dropped.
*/
case class FailureResponse(`type`: String, errorCount: Int, message: List[String])

/**
* Metrics payload that pollers will send back to the manager.
*
* @param tags
* Common tags that should get added to all metrics in the payload.
* @param metrics
* Metrics collected by the poller.
*/
case class MetricsPayload(
tags: Map[String, String] = Map.empty,
metrics: Iterable[Datapoint] = Nil
)
Loading

0 comments on commit 7994737

Please sign in to comment.