diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchMetricsProcessor.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchMetricsProcessor.scala index d7d7be6b..04e61640 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchMetricsProcessor.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchMetricsProcessor.scala @@ -227,18 +227,23 @@ abstract class CloudWatchMetricsProcessor( protected[cloudwatch] def delete(key: Any): Unit /** + * Returns the last successful poll time. + * @param id + * The non-null unique identifier for the polling config. * @return * The last successful poll time in unix epoch milliseconds. */ - protected[cloudwatch] def lastSuccessfulPoll: Long + protected[cloudwatch] def lastSuccessfulPoll(id: String): Long /** * Updates the last successful poll time. * + * @param id + * The non-null unique identifier for the polling config. * @param timestamp * The unix epoch milliseconds of the last successful poll. */ - protected[cloudwatch] def updateLastSuccessfulPoll(timestamp: Long): Unit + protected[cloudwatch] def updateLastSuccessfulPoll(id: String, timestamp: Long): Unit /** * Inserts the given data point in the proper order of the CloudWatchCloudWatchCacheEntry **AND** expires any old data from diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala index da75a2cb..29e57b00 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala @@ -130,7 +130,7 @@ class CloudWatchPoller( accountsUt: Option[Promise[Done]] = None ): Unit = { if (!leaderStatus.hasLeadership) { - logger.info("Not the leader, skipping CloudWatch polling.") + logger.info(s"Not the leader for ${offset}s, skipping CloudWatch polling.") fullRunUt.map(_.success(List.empty)) return } @@ -138,11 +138,11 @@ class CloudWatchPoller( // see if we've past the next run time. var nextRun = 0L try { - val previousRun = processor.lastSuccessfulPoll + val previousRun = processor.lastSuccessfulPoll(offset.toString) nextRun = runAfter(offset, periodFilter) if (previousRun >= nextRun) { logger.info( - s"Skipping CloudWatch polling as we're within the polling interval. Previous ${previousRun}. Next ${nextRun}" + s"Skipping CloudWatch polling for ${offset}s as we're within the polling interval. Previous ${previousRun}. Next ${nextRun}" ) fullRunUt.map(_.success(List.empty)) return @@ -153,7 +153,7 @@ class CloudWatchPoller( } } catch { case ex: Exception => - logger.error("Unexpected exception checking for the last poll timestamp", ex) + logger.error(s"Unexpected exception checking for the last poll timestamp on ${offset}s", ex) return } @@ -205,7 +205,7 @@ class CloudWatchPoller( s"Finished CloudWatch polling with ${got} of ${expecting} metrics in ${(System.currentTimeMillis() - start) / 1000.0} s" ) pollTime.record(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS) - processor.updateLastSuccessfulPoll(nextRun) + processor.updateLastSuccessfulPoll(offset.toString, nextRun) threadPool.shutdown() flag.set(false) fullRunUt.map(_.success(runners.result())) diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/ConfigAccountSupplier.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/ConfigAccountSupplier.scala index f40d8533..116b5670 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/ConfigAccountSupplier.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/ConfigAccountSupplier.scala @@ -16,6 +16,7 @@ package com.netflix.atlas.cloudwatch import com.typesafe.config.Config +import com.typesafe.scalalogging.StrictLogging import software.amazon.awssdk.regions.Region import scala.concurrent.ExecutionContext.Implicits.global @@ -25,7 +26,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala /** * Simple Typesafe config based AWS account supplier. Used for testing and in place of a supplier using internal tooling. */ -class ConfigAccountSupplier(config: Config) extends AwsAccountSupplier { +class ConfigAccountSupplier(config: Config) extends AwsAccountSupplier with StrictLogging { private[cloudwatch] val defaultRegions = if (config.hasPath("atlas.cloudwatch.account.polling.default-regions")) @@ -48,6 +49,7 @@ class ConfigAccountSupplier(config: Config) extends AwsAccountSupplier { c.getString("account") -> regions } .toMap + logger.debug(s"Loaded accounts: ${map}") /** * @return The non-null list of account IDs to poll for CloudWatch metrics. diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/LocalCloudWatchMetricsProcessor.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/LocalCloudWatchMetricsProcessor.scala index f1a68de2..5e29028b 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/LocalCloudWatchMetricsProcessor.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/LocalCloudWatchMetricsProcessor.scala @@ -54,7 +54,7 @@ class LocalCloudWatchMetricsProcessor( // writeTS, expSec, data private val cache = new ConcurrentHashMap[Long, (Long, Long, Array[Byte])] - private val lastPoll = new AtomicLong() + private val lastPoll = new ConcurrentHashMap[String, AtomicLong] override protected[cloudwatch] def updateCache( datapoint: FirehoseMetric, @@ -96,10 +96,14 @@ class LocalCloudWatchMetricsProcessor( cache.remove(key) } - override protected[cloudwatch] def lastSuccessfulPoll: Long = lastPoll.get() + override protected[cloudwatch] def lastSuccessfulPoll(id: String): Long = { + val lastPoll = this.lastPoll.get(id) + if (lastPoll == null) 0L else lastPoll.get() + } - override protected[cloudwatch] def updateLastSuccessfulPoll(timestamp: Long): Unit = - lastPoll.set(timestamp) + override protected[cloudwatch] def updateLastSuccessfulPoll(id: String, timestamp: Long): Unit = { + lastPoll.computeIfAbsent(id, _ => new AtomicLong(0L)).set(timestamp) + } private[cloudwatch] def inject( key: Long, diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/RedisClusterCloudWatchMetricsProcessor.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/RedisClusterCloudWatchMetricsProcessor.scala index 6e149a38..31f7f7e8 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/RedisClusterCloudWatchMetricsProcessor.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/RedisClusterCloudWatchMetricsProcessor.scala @@ -95,7 +95,7 @@ class RedisClusterCloudWatchMetricsProcessor( .count(config.getInt("atlas.redis-cluster.scan.count")) private val commandObjects = new CommandObjects() private val maxBatch = config.getInt("atlas.redis-cluster.batch.size") - private val pollKey = "cw_last_poll".getBytes("UTF-8") + private val pollKey = "cw_last_poll_" private val currentScan = new AtomicReference[Future[Unit]]() private val running = new AtomicBoolean(false) @@ -371,20 +371,20 @@ class RedisClusterCloudWatchMetricsProcessor( } } - override protected[cloudwatch] def lastSuccessfulPoll: Long = { - jedis.get(pollKey) match { + override protected[cloudwatch] def lastSuccessfulPoll(id: String): Long = { + jedis.get((pollKey + id).getBytes("UTF-8")) match { case null => 0 case bytes => ByteBuffer.wrap(bytes).getLong() } } - override protected[cloudwatch] def updateLastSuccessfulPoll(timestamp: Long): Unit = { + override protected[cloudwatch] def updateLastSuccessfulPoll(id: String, timestamp: Long): Unit = { val bytes = new Array[Byte](8) ByteBuffer.wrap(bytes).putLong(timestamp) - jedis.set(pollKey, bytes) match { - case null => logger.error("Failed to set last poll key. Null response.") + jedis.set((pollKey + id).getBytes("UTF-8"), bytes) match { + case null => logger.error(s"Failed to set last poll key ${pollKey + id}. Null response.") case "OK" => // no-op - case unk => logger.error(s"Failed to set last poll key: ${unk}") + case unk => logger.error(s"Failed to set last poll key ${pollKey + id}: ${unk}") } } diff --git a/atlas-cloudwatch/src/test/scala/com/netflix/atlas/cloudwatch/CloudWatchPollerSuite.scala b/atlas-cloudwatch/src/test/scala/com/netflix/atlas/cloudwatch/CloudWatchPollerSuite.scala index daededa5..c14ad2fb 100644 --- a/atlas-cloudwatch/src/test/scala/com/netflix/atlas/cloudwatch/CloudWatchPollerSuite.scala +++ b/atlas-cloudwatch/src/test/scala/com/netflix/atlas/cloudwatch/CloudWatchPollerSuite.scala @@ -30,6 +30,7 @@ import munit.FunSuite import org.mockito.ArgumentMatchers.anyString import org.mockito.ArgumentMatchersSugar.any import org.mockito.ArgumentMatchersSugar.anyInt +import org.mockito.ArgumentMatchersSugar.anyLong import org.mockito.MockitoSugar.mock import org.mockito.MockitoSugar.never import org.mockito.MockitoSugar.times @@ -99,7 +100,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { when(leaderStatus.hasLeadership).thenReturn(true) when(executorFactory.createFixedPool(anyInt)).thenReturn(threadPool) - when(processor.lastSuccessfulPoll).thenReturn(0L) + when(processor.lastSuccessfulPoll(anyString)).thenReturn(0L) when( clientFactory.getInstance( anyString, @@ -129,16 +130,19 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { assertFalse(flag.get) assertCounters() verify(accountSupplier, never).accounts + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll already ran") { - when(processor.lastSuccessfulPoll).thenReturn(System.currentTimeMillis() + 86_400_000L) + when(processor.lastSuccessfulPoll(anyString)) + .thenReturn(System.currentTimeMillis() + 86_400_000L) val poller = getPoller val flag = new AtomicBoolean() poller.poll(offset, List(getCategory(poller)), flag) assertFalse(flag.get) assertCounters() verify(accountSupplier, never).accounts + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll already running") { @@ -148,6 +152,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { assert(flag.get) assertCounters() verify(accountSupplier, never).accounts + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll success") { @@ -169,6 +174,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { assertEquals(pollers.size, 1) assertCounters(expected = 2, polled = 2) assertFalse(flag.get) + verify(processor, times(1)).updateLastSuccessfulPoll(anyString, anyLong) } test("poll on list failure") { @@ -196,6 +202,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { } assertCounters() assertFalse(flag.get) + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll on client exception") { @@ -222,6 +229,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { } assertCounters(errors = Map("setup" -> 1)) assertFalse(flag.get) + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll accounts exception") { @@ -241,6 +249,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { } assertCounters(errors = Map("setup" -> 1)) assertFalse(flag.get) + verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong) } test("poll empty accounts") { @@ -256,6 +265,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase { Await.result(full.future, 60.seconds) assertCounters() assertFalse(flag.get) + verify(processor, times(1)).updateLastSuccessfulPoll(anyString, anyLong) } test("Poller#execute all success") {