From 473725f0b6230222cc4b3bdce48263f19f10420e Mon Sep 17 00:00:00 2001 From: Donal Evans Date: Tue, 1 Mar 2022 08:58:37 -0800 Subject: [PATCH] GEODE-10010: Improve accuracy of per-second Redis stats (#7358) Code changes: - Rather than a value that updates once per second, instantaneous operations per second and instantaneous kilobytes read per second now return a rolling average of those stats updated 16 times per second - Introduce RollingAverageStat nested class in RedisStats Test changes: - Rename AbstractRedisInfoStatsIntegrationTest to match child classes - Rather than measuring instantaneous per second stats after operations have finished, sample them while operations are ongoing and calculate the expected value based on the number of operations performed in the one second prior to sampling. - Change test tolerances from fixed value to 12.5% of expected value Authored-by: Donal Evans --- .../InfoStatsNativeRedisAcceptanceTest.java | 2 +- ... => AbstractInfoStatsIntegrationTest.java} | 159 +++++++++++++----- .../server/InfoStatsIntegrationTest.java | 6 +- .../redis/internal/statistics/RedisStats.java | 97 ++++++----- 4 files changed, 177 insertions(+), 87 deletions(-) rename geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/{AbstractRedisInfoStatsIntegrationTest.java => AbstractInfoStatsIntegrationTest.java} (61%) diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java index 6e831f3565ac..c60a43e20b0b 100644 --- a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java @@ -20,7 +20,7 @@ import org.apache.geode.NativeRedisTestRule; -public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static NativeRedisTestRule redis = new NativeRedisTestRule(); diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java similarity index 61% rename from geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java rename to geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java index f32c6c06c2ef..e72709b3708c 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java @@ -15,17 +15,25 @@ package org.apache.geode.redis.internal.commands.executor.server; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.offset; +import static org.assertj.core.api.Assertions.withinPercentage; import java.time.Duration; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -34,17 +42,19 @@ import org.apache.geode.redis.RedisIntegrationTest; import org.apache.geode.redis.RedisTestHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; -public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest { +public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); - private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); private static final String EXISTING_HASH_KEY = "Existing_Hash"; private static final String EXISTING_STRING_KEY = "Existing_String"; private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; private Jedis jedis; - private static long START_TIME; + private static long startTime; private static StatisticsClock statisticsClock; private long preTestConnectionsReceived = 0; @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte @BeforeClass public static void beforeClass() { statisticsClock = new EnabledStatisticsClock(); - START_TIME = statisticsClock.getTime(); + startTime = statisticsClock.getTime(); } @Before public void before() { - jedis = new Jedis("localhost", getPort(), TIMEOUT); + jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); numInfoCalled.set(0); long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED)); @@ -157,27 +167,57 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { } @Test - public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { - int NUMBER_SECONDS_TO_RUN = 10; - AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); - - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - numberOfCommandsExecuted.getAndIncrement(); - return true; + public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() + throws InterruptedException, ExecutionException, TimeoutException { + final long numberSecondsToRun = 4; + AtomicInteger totalOpsPerformed = new AtomicInteger(); + + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + final long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set("key", "value"); + totalOpsPerformed.getAndIncrement(); + } + jedis2.close(); }); - double reportedCommandsPerLastSecond = + + // Record the total number of operations performed a second before we plan to sample. A poll + // interval less than the default of 100ms is used to increase the accuracy of the expected + // value, as the stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed); + final int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get(); + + // Calculate how many operations were performed in the last second. A poll interval less than + // the default of 100ms is used to increase the accuracy of the expected value, as the stats + // update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + + final double reportedCommandsPerLastSecond = Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)); - long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN; + final int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling; - assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0)); + assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(1.0)); + // Wait two seconds with no operations + Thread.sleep(2000); + + // Confirm that instantaneous operations per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0, + offset(1.0)); } @Test @@ -193,39 +233,72 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { } @Test - public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { - - double REASONABLE_SOUNDING_OFFSET = .8; - int NUMBER_SECONDS_TO_RUN = 5; - String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; - int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); + public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() + throws InterruptedException, ExecutionException, TimeoutException { + final int numberSecondsToRun = 4; + final String command = "set"; + final String key = "key"; + final String value = "value"; + final int bytesSentPerCommand = + ("*3\r\n$" + command.length() + "\r\n" + command + + "\r\n$" + key.length() + "\r\n" + key + + "\r\n$" + value.length() + "\r\n" + value + + "\r\n").length(); AtomicInteger totalBytesSent = new AtomicInteger(); - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); - return true; + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + final long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set(key, value); + totalBytesSent.addAndGet(bytesSentPerCommand); + } + jedis2.close(); }); - double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); - double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000; - assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); + // Record the total number of KB sent a second before we plan to sample. A poll interval less + // than the default of 100ms is used to increase the accuracy of the expected value, as the + // stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent); + final int bytesSentUntilASecondBeforeSampling = totalBytesSent.get(); + + // Calculate how many KB were sent in the last second. A poll interval less than the default of + // 100ms is used to increase the accuracy of the expected value, as the stats update the value + // of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + + final double reportedKBReadPerLastSecond = + Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); + + final double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0; + + assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); + + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) - .until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - // Kb/s should eventually drop to 0 or at least very close since just executing the info - // command may result in the value increasing. - assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(0.1)); + // Confirm that instantaneous KB read per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) + .isCloseTo(0, offset(0.02)); } // ------------------- Clients Section -------------------------- // @Test public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectedClients(jedis, preTestConnectedClients, 1); @@ -237,7 +310,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { @Test public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); @@ -276,7 +349,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { // ------------------- Helper Methods ----------------------------- // public long getStartTime() { - return START_TIME; + return startTime; } public long getCurrentTime() { diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java index dd34917a87cb..675c48dca1ae 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java @@ -20,7 +20,7 @@ import org.apache.geode.redis.GeodeRedisServerRule; -public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); @@ -35,7 +35,5 @@ public int getExposedPort() { } @Override - public void configureMaxMemory(Jedis jedis) { - return; - } + public void configureMaxMemory(Jedis jedis) {} } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java index 35d955739ad9..91b8e7f0f980 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java @@ -16,15 +16,18 @@ package org.apache.geode.redis.internal.statistics; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime; import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; +import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.geode.internal.statistics.StatisticsClock; +import org.apache.geode.redis.internal.RedisException; import org.apache.geode.redis.internal.commands.RedisCommandType; public class RedisStats { @@ -37,23 +40,25 @@ public class RedisStats { private final AtomicLong keyspaceMisses = new AtomicLong(); private final AtomicLong uniqueChannelSubscriptions = new AtomicLong(); private final AtomicLong uniquePatternSubscriptions = new AtomicLong(); - private final ScheduledExecutorService perSecondExecutor; + + private final ScheduledExecutorService rollingAverageExecutor; + private static final int ROLLING_AVERAGE_SAMPLES_PER_SECOND = 16; + private final RollingAverageStat networkBytesReadRollingAverageStat = + new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getTotalNetworkBytesRead); private volatile double networkKiloBytesReadOverLastSecond; - private volatile long opsPerformedLastTick; - private double opsPerformedOverLastSecond; - private long previousNetworkBytesRead; + private final RollingAverageStat opsPerformedRollingAverageStat = + new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getCommandsProcessed); + private volatile double opsPerformedOverLastSecond; + private final StatisticsClock clock; private final GeodeRedisStats geodeRedisStats; - private final long START_TIME_IN_NANOS; - - - public RedisStats(StatisticsClock clock, - GeodeRedisStats geodeRedisStats) { + private final long startTimeInNanos; + public RedisStats(StatisticsClock clock, GeodeRedisStats geodeRedisStats) { this.clock = clock; this.geodeRedisStats = geodeRedisStats; - perSecondExecutor = startPerSecondUpdater(); - START_TIME_IN_NANOS = clock.getTime(); + rollingAverageExecutor = startRollingAverageUpdater(); + startTimeInNanos = clock.getTime(); } public void incCommandsProcessed() { @@ -114,7 +119,7 @@ public long getCurrentTimeNanos() { } public long getUptimeInMilliseconds() { - long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS; + long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos; return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos); } @@ -193,43 +198,57 @@ public void changeUniquePatternSubscriptions(long delta) { public void close() { geodeRedisStats.close(); - stopPerSecondUpdater(); + stopRollingAverageUpdater(); } - private ScheduledExecutorService startPerSecondUpdater() { - int INTERVAL = 1; + private ScheduledExecutorService startRollingAverageUpdater() { + long microsPerSecond = 1_000_000; + final long delayMicros = microsPerSecond / ROLLING_AVERAGE_SAMPLES_PER_SECOND; - ScheduledExecutorService perSecondExecutor = - newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-"); + ScheduledExecutorService rollingAverageExecutor = + newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-"); - perSecondExecutor.scheduleWithFixedDelay( - this::doPerSecondUpdates, - INTERVAL, - INTERVAL, - SECONDS); + rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros, + delayMicros, MICROSECONDS); - return perSecondExecutor; + return rollingAverageExecutor; } - private void stopPerSecondUpdater() { - perSecondExecutor.shutdownNow(); + private void stopRollingAverageUpdater() { + rollingAverageExecutor.shutdownNow(); } - private void doPerSecondUpdates() { - updateNetworkKilobytesReadLastSecond(); - updateOpsPerformedOverLastSecond(); + private void doRollingAverageUpdates() { + networkKiloBytesReadOverLastSecond = networkBytesReadRollingAverageStat.calculate() / 1024.0; + opsPerformedOverLastSecond = opsPerformedRollingAverageStat.calculate(); } - private void updateNetworkKilobytesReadLastSecond() { - final long totalNetworkBytesRead = getTotalNetworkBytesRead(); - long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead; - networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0; - previousNetworkBytesRead = totalNetworkBytesRead; - } + private static class RollingAverageStat { + private int tickNumber = 0; + private long valueReadLastTick; + private final long[] valuesReadOverLastNSamples; + private final Callable statCallable; + + private RollingAverageStat(int samplesPerSecond, Callable getCurrentValue) { + valuesReadOverLastNSamples = new long[samplesPerSecond]; + statCallable = getCurrentValue; + } - private void updateOpsPerformedOverLastSecond() { - final long totalOpsPerformed = getCommandsProcessed(); - opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick; - opsPerformedLastTick = totalOpsPerformed; + long calculate() { + long currentValue; + try { + currentValue = statCallable.call(); + } catch (Exception e) { + throw new RedisException("Error while calculating RollingAverage stats", e); + } + long delta = currentValue - valueReadLastTick; + valueReadLastTick = currentValue; + valuesReadOverLastNSamples[tickNumber] = delta; + tickNumber++; + if (tickNumber >= valuesReadOverLastNSamples.length) { + tickNumber = 0; + } + return Arrays.stream(valuesReadOverLastNSamples).sum(); + } } }