diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java index 6e831f3565ac..c60a43e20b0b 100644 --- a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java @@ -20,7 +20,7 @@ import org.apache.geode.NativeRedisTestRule; -public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static NativeRedisTestRule redis = new NativeRedisTestRule(); diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java similarity index 61% rename from geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java rename to geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java index f32c6c06c2ef..b2673deb78c0 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java @@ -15,17 +15,25 @@ package org.apache.geode.redis.internal.commands.executor.server; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.offset; +import static org.assertj.core.api.Assertions.withinPercentage; import java.time.Duration; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -34,17 +42,19 @@ import org.apache.geode.redis.RedisIntegrationTest; import org.apache.geode.redis.RedisTestHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; -public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest { +public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); - private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); private static final String EXISTING_HASH_KEY = "Existing_Hash"; private static final String EXISTING_STRING_KEY = "Existing_String"; private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; private Jedis jedis; - private static long START_TIME; + private static long startTime; private static StatisticsClock statisticsClock; private long preTestConnectionsReceived = 0; @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte @BeforeClass public static void beforeClass() { statisticsClock = new EnabledStatisticsClock(); - START_TIME = statisticsClock.getTime(); + startTime = statisticsClock.getTime(); } @Before public void before() { - jedis = new Jedis("localhost", getPort(), TIMEOUT); + jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); numInfoCalled.set(0); long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED)); @@ -157,27 +167,56 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { } @Test - public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { - int NUMBER_SECONDS_TO_RUN = 10; - AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); - - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - numberOfCommandsExecuted.getAndIncrement(); - return true; + public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() + throws InterruptedException, ExecutionException, TimeoutException { + long numberSecondsToRun = 4; + AtomicInteger totalOpsPerformed = new AtomicInteger(); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set("key", "value"); + totalOpsPerformed.getAndIncrement(); + } + jedis2.close(); }); + + // Record the total number of operations performed a second before we plan to sample. A poll + // interval less than the default of 100ms is used to increase the accuracy of the expected + // value, as the stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed); + int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get(); + + // Calculate how many operations were performed in the last second. A poll interval less than + // the default of 100ms is used to increase the accuracy of the expected value, as the stats + // update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling; + double reportedCommandsPerLastSecond = Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)); - long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN; + assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(10)); - assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0)); + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(1.0)); + // Confirm that instantaneous operations per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0, + offset(1.0)); } @Test @@ -193,39 +232,71 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { } @Test - public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { - - double REASONABLE_SOUNDING_OFFSET = .8; - int NUMBER_SECONDS_TO_RUN = 5; - String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; - int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); + public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() + throws InterruptedException, ExecutionException, TimeoutException { + int numberSecondsToRun = 4; + String command = "set"; + String key = "key"; + String value = "value"; + int bytesSentPerCommand = + ("*3\r\n$" + command.length() + "\r\n" + command + + "\r\n$" + key.length() + "\r\n" + key + + "\r\n$" + value.length() + "\r\n" + value + + "\r\n").length(); AtomicInteger totalBytesSent = new AtomicInteger(); - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); - return true; + long startTime = System.currentTimeMillis(); + long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set(key, value); + totalBytesSent.addAndGet(bytesSentPerCommand); + } + jedis2.close(); }); - double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); - double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000; - assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); + // Record the total number of KB sent a second before we plan to sample. A poll interval less + // than the default of 100ms is used to increase the accuracy of the expected value, as the + // stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent); + int bytesSentUntilASecondBeforeSampling = totalBytesSent.get(); + + // Calculate how many KB were sent in the last second. A poll interval less than the default of + // 100ms is used to increase the accuracy of the expected value, as the stats update the value + // of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(50)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0; + + double reportedKBReadPerLastSecond = + Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); + + assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(10)); + + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) - .until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - // Kb/s should eventually drop to 0 or at least very close since just executing the info - // command may result in the value increasing. - assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(0.1)); + // Confirm that instantaneous KB read per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) + .isCloseTo(0, offset(0.02)); } // ------------------- Clients Section -------------------------- // @Test public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectedClients(jedis, preTestConnectedClients, 1); @@ -237,7 +308,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { @Test public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); @@ -276,7 +347,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { // ------------------- Helper Methods ----------------------------- // public long getStartTime() { - return START_TIME; + return startTime; } public long getCurrentTime() { diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java index dd34917a87cb..adbf91dc76ba 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java @@ -20,7 +20,7 @@ import org.apache.geode.redis.GeodeRedisServerRule; -public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java index b4317d84f130..0af5ad59f99e 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java @@ -16,10 +16,11 @@ package org.apache.geode.redis.internal.statistics; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime; import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; +import java.util.Arrays; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -37,14 +38,20 @@ public class RedisStats { private final AtomicLong keyspaceMisses = new AtomicLong(); private final AtomicLong uniqueChannelSubscriptions = new AtomicLong(); private final AtomicLong uniquePatternSubscriptions = new AtomicLong(); - private final ScheduledExecutorService perSecondExecutor; + + private final int rollingAverageSamplesPerSecond = 16; + private final ScheduledExecutorService rollingAverageExecutor; private volatile double networkKiloBytesReadOverLastSecond; - private volatile long opsPerformedLastTick; + private final long[] networkBytesReadOverLastNSamples = new long[rollingAverageSamplesPerSecond]; + private long totalNetworkBytesReadLastTick; private double opsPerformedOverLastSecond; - private long previousNetworkBytesRead; + private volatile long totalOpsPerformedLastTick; + private final long[] opsPerformedOverLastNSamples = new long[rollingAverageSamplesPerSecond]; + private int rollingAverageTick = 0; + private final StatisticsClock clock; private final GeodeRedisStats geodeRedisStats; - private final long START_TIME_IN_NANOS; + private final long startTimeInNanos; public RedisStats(StatisticsClock clock, @@ -52,8 +59,8 @@ public RedisStats(StatisticsClock clock, this.clock = clock; this.geodeRedisStats = geodeRedisStats; - perSecondExecutor = startPerSecondUpdater(); - START_TIME_IN_NANOS = clock.getTime(); + rollingAverageExecutor = startRollingAverageUpdater(); + startTimeInNanos = clock.getTime(); } public void incCommandsProcessed() { @@ -114,7 +121,7 @@ public long getCurrentTimeNanos() { } public long getUptimeInMilliseconds() { - long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS; + long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos; return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos); } @@ -194,43 +201,49 @@ public void changeUniquePatternSubscriptions(long delta) { public void close() { geodeRedisStats.close(); - stopPerSecondUpdater(); + stopRollingAverageUpdater(); } - private ScheduledExecutorService startPerSecondUpdater() { - int INTERVAL = 1; + private ScheduledExecutorService startRollingAverageUpdater() { + long microsPerSecond = 1_000_000; + final long delayMicros = microsPerSecond / rollingAverageSamplesPerSecond; - ScheduledExecutorService perSecondExecutor = - newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-"); + ScheduledExecutorService rollingAverageExecutor = + newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-"); - perSecondExecutor.scheduleWithFixedDelay( - this::doPerSecondUpdates, - INTERVAL, - INTERVAL, - SECONDS); + rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros, + delayMicros, MICROSECONDS); - return perSecondExecutor; + return rollingAverageExecutor; } - private void stopPerSecondUpdater() { - perSecondExecutor.shutdownNow(); + private void stopRollingAverageUpdater() { + rollingAverageExecutor.shutdownNow(); } - private void doPerSecondUpdates() { - updateNetworkKilobytesReadLastSecond(); - updateOpsPerformedOverLastSecond(); + private void doRollingAverageUpdates() { + updateNetworkKilobytesReadLastSecond(rollingAverageTick); + updateOpsPerformedOverLastSecond(rollingAverageTick); + rollingAverageTick++; + if (rollingAverageTick >= rollingAverageSamplesPerSecond) { + rollingAverageTick = 0; + } } - private void updateNetworkKilobytesReadLastSecond() { + private void updateNetworkKilobytesReadLastSecond(int tickNumber) { final long totalNetworkBytesRead = getTotalNetworkBytesRead(); - long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead; - networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0; - previousNetworkBytesRead = totalNetworkBytesRead; + long deltaNetworkBytesRead = totalNetworkBytesRead - totalNetworkBytesReadLastTick; + networkBytesReadOverLastNSamples[tickNumber] = deltaNetworkBytesRead; + networkKiloBytesReadOverLastSecond = + Arrays.stream(networkBytesReadOverLastNSamples).sum() / 1024.0; + totalNetworkBytesReadLastTick = totalNetworkBytesRead; } - private void updateOpsPerformedOverLastSecond() { + private void updateOpsPerformedOverLastSecond(int tickNumber) { final long totalOpsPerformed = getCommandsProcessed(); - opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick; - opsPerformedLastTick = totalOpsPerformed; + long deltaOpsPerformed = totalOpsPerformed - totalOpsPerformedLastTick; + opsPerformedOverLastNSamples[tickNumber] = deltaOpsPerformed; + opsPerformedOverLastSecond = Arrays.stream(opsPerformedOverLastNSamples).sum(); + totalOpsPerformedLastTick = totalOpsPerformed; } }