From 35e44946175d16d9d17c908a2e0a263497586dff Mon Sep 17 00:00:00 2001 From: Donal Evans Date: Thu, 10 Feb 2022 12:08:04 -0800 Subject: [PATCH] GEODE-10010: Calculate per-second Redis stats as rolling average - Rather than a value that updates once per second, instantaneous operations per second and instantaneous kilobytes read per second now return a rolling average of those stats updated 16 times per second - Rename AbstractRedisInfoStatsIntegrationTest to match child classes - Rather than measuring instantaneous per second stats after operations have finished, sample them while operations are ongoing and calculate the expected value based on the number of operations performed in the one second prior to sampling. Authored-by: Donal Evans --- .../InfoStatsNativeRedisAcceptanceTest.java | 2 +- ... => AbstractInfoStatsIntegrationTest.java} | 155 +++++++++++++----- .../server/InfoStatsIntegrationTest.java | 2 +- .../redis/internal/statistics/RedisStats.java | 75 +++++---- 4 files changed, 159 insertions(+), 75 deletions(-) rename geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/{AbstractRedisInfoStatsIntegrationTest.java => AbstractInfoStatsIntegrationTest.java} (61%) diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java index 6e831f3565ac..c60a43e20b0b 100644 --- a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java @@ -20,7 +20,7 @@ import org.apache.geode.NativeRedisTestRule; -public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static NativeRedisTestRule redis = new NativeRedisTestRule(); diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java similarity index 61% rename from geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java rename to geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java index f32c6c06c2ef..b2673deb78c0 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java @@ -15,17 +15,25 @@ package org.apache.geode.redis.internal.commands.executor.server; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.offset; +import static org.assertj.core.api.Assertions.withinPercentage; import java.time.Duration; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -34,17 +42,19 @@ import org.apache.geode.redis.RedisIntegrationTest; import org.apache.geode.redis.RedisTestHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; -public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest { +public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); - private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); private static final String EXISTING_HASH_KEY = "Existing_Hash"; private static final String EXISTING_STRING_KEY = "Existing_String"; private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; private Jedis jedis; - private static long START_TIME; + private static long startTime; private static StatisticsClock statisticsClock; private long preTestConnectionsReceived = 0; @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte @BeforeClass public static void beforeClass() { statisticsClock = new EnabledStatisticsClock(); - START_TIME = statisticsClock.getTime(); + startTime = statisticsClock.getTime(); } @Before public void before() { - jedis = new Jedis("localhost", getPort(), TIMEOUT); + jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); numInfoCalled.set(0); long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED)); @@ -157,27 +167,56 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { } @Test - public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { - int NUMBER_SECONDS_TO_RUN = 10; - AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); - - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - numberOfCommandsExecuted.getAndIncrement(); - return true; + public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() + throws InterruptedException, ExecutionException, TimeoutException { + long numberSecondsToRun = 4; + AtomicInteger totalOpsPerformed = new AtomicInteger(); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set("key", "value"); + totalOpsPerformed.getAndIncrement(); + } + jedis2.close(); }); + + // Record the total number of operations performed a second before we plan to sample. A poll + // interval less than the default of 100ms is used to increase the accuracy of the expected + // value, as the stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed); + int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get(); + + // Calculate how many operations were performed in the last second. A poll interval less than + // the default of 100ms is used to increase the accuracy of the expected value, as the stats + // update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling; + double reportedCommandsPerLastSecond = Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)); - long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN; + assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(10)); - assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0)); + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(1.0)); + // Confirm that instantaneous operations per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0, + offset(1.0)); } @Test @@ -193,39 +232,71 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { } @Test - public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { - - double REASONABLE_SOUNDING_OFFSET = .8; - int NUMBER_SECONDS_TO_RUN = 5; - String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; - int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); + public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() + throws InterruptedException, ExecutionException, TimeoutException { + int numberSecondsToRun = 4; + String command = "set"; + String key = "key"; + String value = "value"; + int bytesSentPerCommand = + ("*3\r\n$" + command.length() + "\r\n" + command + + "\r\n$" + key.length() + "\r\n" + key + + "\r\n$" + value.length() + "\r\n" + value + + "\r\n").length(); AtomicInteger totalBytesSent = new AtomicInteger(); - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); - return true; + long startTime = System.currentTimeMillis(); + long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set(key, value); + totalBytesSent.addAndGet(bytesSentPerCommand); + } + jedis2.close(); }); - double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); - double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000; - assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); + // Record the total number of KB sent a second before we plan to sample. A poll interval less + // than the default of 100ms is used to increase the accuracy of the expected value, as the + // stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent); + int bytesSentUntilASecondBeforeSampling = totalBytesSent.get(); + + // Calculate how many KB were sent in the last second. A poll interval less than the default of + // 100ms is used to increase the accuracy of the expected value, as the stats update the value + // of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0; + + double reportedKBReadPerLastSecond = + Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); + + assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(10)); + + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) - .until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - // Kb/s should eventually drop to 0 or at least very close since just executing the info - // command may result in the value increasing. - assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(0.1)); + // Confirm that instantaneous KB read per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) + .isCloseTo(0, offset(0.02)); } // ------------------- Clients Section -------------------------- // @Test public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectedClients(jedis, preTestConnectedClients, 1); @@ -237,7 +308,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { @Test public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); @@ -276,7 +347,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { // ------------------- Helper Methods ----------------------------- // public long getStartTime() { - return START_TIME; + return startTime; } public long getCurrentTime() { diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java index dd34917a87cb..adbf91dc76ba 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java @@ -20,7 +20,7 @@ import org.apache.geode.redis.GeodeRedisServerRule; -public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java index b4317d84f130..0af5ad59f99e 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java @@ -16,10 +16,11 @@ package org.apache.geode.redis.internal.statistics; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime; import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; +import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -37,14 +38,20 @@ public class RedisStats { private final AtomicLong keyspaceMisses = new AtomicLong(); private final AtomicLong uniqueChannelSubscriptions = new AtomicLong(); private final AtomicLong uniquePatternSubscriptions = new AtomicLong(); - private final ScheduledExecutorService perSecondExecutor; + + private final int rollingAverageSamplesPerSecond = 16; + private final ScheduledExecutorService rollingAverageExecutor; private volatile double networkKiloBytesReadOverLastSecond; - private volatile long opsPerformedLastTick; + private final long[] networkBytesReadOverLastNSamples = new long[rollingAverageSamplesPerSecond]; + private long totalNetworkBytesReadLastTick; private double opsPerformedOverLastSecond; - private long previousNetworkBytesRead; + private volatile long totalOpsPerformedLastTick; + private final long[] opsPerformedOverLastNSamples = new long[rollingAverageSamplesPerSecond]; + private int rollingAverageTick = 0; + private final StatisticsClock clock; private final GeodeRedisStats geodeRedisStats; - private final long START_TIME_IN_NANOS; + private final long startTimeInNanos; public RedisStats(StatisticsClock clock, @@ -52,8 +59,8 @@ public RedisStats(StatisticsClock clock, this.clock = clock; this.geodeRedisStats = geodeRedisStats; - perSecondExecutor = startPerSecondUpdater(); - START_TIME_IN_NANOS = clock.getTime(); + rollingAverageExecutor = startRollingAverageUpdater(); + startTimeInNanos = clock.getTime(); } public void incCommandsProcessed() { @@ -114,7 +121,7 @@ public long getCurrentTimeNanos() { } public long getUptimeInMilliseconds() { - long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS; + long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos; return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos); } @@ -194,43 +201,49 @@ public void changeUniquePatternSubscriptions(long delta) { public void close() { geodeRedisStats.close(); - stopPerSecondUpdater(); + stopRollingAverageUpdater(); } - private ScheduledExecutorService startPerSecondUpdater() { - int INTERVAL = 1; + private ScheduledExecutorService startRollingAverageUpdater() { + long microsPerSecond = 1_000_000; + final long delayMicros = microsPerSecond / rollingAverageSamplesPerSecond; - ScheduledExecutorService perSecondExecutor = - newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-"); + ScheduledExecutorService rollingAverageExecutor = + newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-"); - perSecondExecutor.scheduleWithFixedDelay( - this::doPerSecondUpdates, - INTERVAL, - INTERVAL, - SECONDS); + rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros, + delayMicros, MICROSECONDS); - return perSecondExecutor; + return rollingAverageExecutor; } - private void stopPerSecondUpdater() { - perSecondExecutor.shutdownNow(); + private void stopRollingAverageUpdater() { + rollingAverageExecutor.shutdownNow(); } - private void doPerSecondUpdates() { - updateNetworkKilobytesReadLastSecond(); - updateOpsPerformedOverLastSecond(); + private void doRollingAverageUpdates() { + updateNetworkKilobytesReadLastSecond(rollingAverageTick); + updateOpsPerformedOverLastSecond(rollingAverageTick); + rollingAverageTick++; + if (rollingAverageTick >= rollingAverageSamplesPerSecond) { + rollingAverageTick = 0; + } } - private void updateNetworkKilobytesReadLastSecond() { + private void updateNetworkKilobytesReadLastSecond(int tickNumber) { final long totalNetworkBytesRead = getTotalNetworkBytesRead(); - long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead; - networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0; - previousNetworkBytesRead = totalNetworkBytesRead; + long deltaNetworkBytesRead = totalNetworkBytesRead - totalNetworkBytesReadLastTick; + networkBytesReadOverLastNSamples[tickNumber] = deltaNetworkBytesRead; + networkKiloBytesReadOverLastSecond = + Arrays.stream(networkBytesReadOverLastNSamples).sum() / 1024.0; + totalNetworkBytesReadLastTick = totalNetworkBytesRead; } - private void updateOpsPerformedOverLastSecond() { + private void updateOpsPerformedOverLastSecond(int tickNumber) { final long totalOpsPerformed = getCommandsProcessed(); - opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick; - opsPerformedLastTick = totalOpsPerformed; + long deltaOpsPerformed = totalOpsPerformed - totalOpsPerformedLastTick; + opsPerformedOverLastNSamples[tickNumber] = deltaOpsPerformed; + opsPerformedOverLastSecond = Arrays.stream(opsPerformedOverLastNSamples).sum(); + totalOpsPerformedLastTick = totalOpsPerformed; } }