diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java index 6e831f3565ac..c60a43e20b0b 100644 --- a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java @@ -20,7 +20,7 @@ import org.apache.geode.NativeRedisTestRule; -public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static NativeRedisTestRule redis = new NativeRedisTestRule(); diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java similarity index 61% rename from geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java rename to geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java index f32c6c06c2ef..e72709b3708c 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java @@ -15,17 +15,25 @@ package org.apache.geode.redis.internal.commands.executor.server; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; +import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.offset; +import static org.assertj.core.api.Assertions.withinPercentage; import java.time.Duration; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -34,17 +42,19 @@ import org.apache.geode.redis.RedisIntegrationTest; import org.apache.geode.redis.RedisTestHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; -public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest { +public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest { + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); - private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); private static final String EXISTING_HASH_KEY = "Existing_Hash"; private static final String EXISTING_STRING_KEY = "Existing_String"; private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; private Jedis jedis; - private static long START_TIME; + private static long startTime; private static StatisticsClock statisticsClock; private long preTestConnectionsReceived = 0; @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte @BeforeClass public static void beforeClass() { statisticsClock = new EnabledStatisticsClock(); - START_TIME = statisticsClock.getTime(); + startTime = statisticsClock.getTime(); } @Before public void before() { - jedis = new Jedis("localhost", getPort(), TIMEOUT); + jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); numInfoCalled.set(0); long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED)); @@ -157,27 +167,57 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { } @Test - public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { - int NUMBER_SECONDS_TO_RUN = 10; - AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); - - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - numberOfCommandsExecuted.getAndIncrement(); - return true; + public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() + throws InterruptedException, ExecutionException, TimeoutException { + final long numberSecondsToRun = 4; + AtomicInteger totalOpsPerformed = new AtomicInteger(); + + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + final long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set("key", "value"); + totalOpsPerformed.getAndIncrement(); + } + jedis2.close(); }); - double reportedCommandsPerLastSecond = + + // Record the total number of operations performed a second before we plan to sample. A poll + // interval less than the default of 100ms is used to increase the accuracy of the expected + // value, as the stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed); + final int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get(); + + // Calculate how many operations were performed in the last second. A poll interval less than + // the default of 100ms is used to increase the accuracy of the expected value, as the stats + // update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + + final double reportedCommandsPerLastSecond = Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)); - long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN; + final int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling; - assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0)); + assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(1.0)); + // Wait two seconds with no operations + Thread.sleep(2000); + + // Confirm that instantaneous operations per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0, + offset(1.0)); } @Test @@ -193,39 +233,72 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { } @Test - public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { - - double REASONABLE_SOUNDING_OFFSET = .8; - int NUMBER_SECONDS_TO_RUN = 5; - String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; - int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); + public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() + throws InterruptedException, ExecutionException, TimeoutException { + final int numberSecondsToRun = 4; + final String command = "set"; + final String key = "key"; + final String value = "value"; + final int bytesSentPerCommand = + ("*3\r\n$" + command.length() + "\r\n" + command + + "\r\n$" + key.length() + "\r\n" + key + + "\r\n$" + value.length() + "\r\n" + value + + "\r\n").length(); AtomicInteger totalBytesSent = new AtomicInteger(); - await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { - jedis.set("key", "value"); - totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); - return true; + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); + + // Take a sample in the middle of performing operations to help eliminate warmup as a factor + final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); + final long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis(); + + // Execute commands in the background + Future executeCommands = executor.submit(() -> { + Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + while (System.currentTimeMillis() < endTime) { + jedis2.set(key, value); + totalBytesSent.addAndGet(bytesSentPerCommand); + } + jedis2.close(); }); - double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); - double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000; - assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); + // Record the total number of KB sent a second before we plan to sample. A poll interval less + // than the default of 100ms is used to increase the accuracy of the expected value, as the + // stats update the value of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent); + final int bytesSentUntilASecondBeforeSampling = totalBytesSent.get(); + + // Calculate how many KB were sent in the last second. A poll interval less than the default of + // 100ms is used to increase the accuracy of the expected value, as the stats update the value + // of instantaneous per second values every 62.5ms + await().pollInterval(Duration.ofMillis(10)) + .until(() -> System.currentTimeMillis() >= timeToSampleAt); + + final double reportedKBReadPerLastSecond = + Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); + + final double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0; + + assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); + + executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); - // if time passes w/o operations - await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) - .until(() -> true); + // Wait two seconds with no operations + Thread.sleep(2000); - // Kb/s should eventually drop to 0 or at least very close since just executing the info - // command may result in the value increasing. - assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) - .isCloseTo(0.0, Offset.offset(0.1)); + // Confirm that instantaneous KB read per second returns to zero when no operations are being + // performed, with a small offset to account for the info command being executed + assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) + .isCloseTo(0, offset(0.02)); } // ------------------- Clients Section -------------------------- // @Test public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectedClients(jedis, preTestConnectedClients, 1); @@ -237,7 +310,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { @Test public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { - Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); jedis2.ping(); validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); @@ -276,7 +349,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { // ------------------- Helper Methods ----------------------------- // public long getStartTime() { - return START_TIME; + return startTime; } public long getCurrentTime() { diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java index dd34917a87cb..675c48dca1ae 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java @@ -20,7 +20,7 @@ import org.apache.geode.redis.GeodeRedisServerRule; -public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest { +public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest { @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); @@ -35,7 +35,5 @@ public int getExposedPort() { } @Override - public void configureMaxMemory(Jedis jedis) { - return; - } + public void configureMaxMemory(Jedis jedis) {} } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java index 35d955739ad9..91b8e7f0f980 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java @@ -16,15 +16,18 @@ package org.apache.geode.redis.internal.statistics; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime; import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; +import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.geode.internal.statistics.StatisticsClock; +import org.apache.geode.redis.internal.RedisException; import org.apache.geode.redis.internal.commands.RedisCommandType; public class RedisStats { @@ -37,23 +40,25 @@ public class RedisStats { private final AtomicLong keyspaceMisses = new AtomicLong(); private final AtomicLong uniqueChannelSubscriptions = new AtomicLong(); private final AtomicLong uniquePatternSubscriptions = new AtomicLong(); - private final ScheduledExecutorService perSecondExecutor; + + private final ScheduledExecutorService rollingAverageExecutor; + private static final int ROLLING_AVERAGE_SAMPLES_PER_SECOND = 16; + private final RollingAverageStat networkBytesReadRollingAverageStat = + new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getTotalNetworkBytesRead); private volatile double networkKiloBytesReadOverLastSecond; - private volatile long opsPerformedLastTick; - private double opsPerformedOverLastSecond; - private long previousNetworkBytesRead; + private final RollingAverageStat opsPerformedRollingAverageStat = + new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getCommandsProcessed); + private volatile double opsPerformedOverLastSecond; + private final StatisticsClock clock; private final GeodeRedisStats geodeRedisStats; - private final long START_TIME_IN_NANOS; - - - public RedisStats(StatisticsClock clock, - GeodeRedisStats geodeRedisStats) { + private final long startTimeInNanos; + public RedisStats(StatisticsClock clock, GeodeRedisStats geodeRedisStats) { this.clock = clock; this.geodeRedisStats = geodeRedisStats; - perSecondExecutor = startPerSecondUpdater(); - START_TIME_IN_NANOS = clock.getTime(); + rollingAverageExecutor = startRollingAverageUpdater(); + startTimeInNanos = clock.getTime(); } public void incCommandsProcessed() { @@ -114,7 +119,7 @@ public long getCurrentTimeNanos() { } public long getUptimeInMilliseconds() { - long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS; + long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos; return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos); } @@ -193,43 +198,57 @@ public void changeUniquePatternSubscriptions(long delta) { public void close() { geodeRedisStats.close(); - stopPerSecondUpdater(); + stopRollingAverageUpdater(); } - private ScheduledExecutorService startPerSecondUpdater() { - int INTERVAL = 1; + private ScheduledExecutorService startRollingAverageUpdater() { + long microsPerSecond = 1_000_000; + final long delayMicros = microsPerSecond / ROLLING_AVERAGE_SAMPLES_PER_SECOND; - ScheduledExecutorService perSecondExecutor = - newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-"); + ScheduledExecutorService rollingAverageExecutor = + newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-"); - perSecondExecutor.scheduleWithFixedDelay( - this::doPerSecondUpdates, - INTERVAL, - INTERVAL, - SECONDS); + rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros, + delayMicros, MICROSECONDS); - return perSecondExecutor; + return rollingAverageExecutor; } - private void stopPerSecondUpdater() { - perSecondExecutor.shutdownNow(); + private void stopRollingAverageUpdater() { + rollingAverageExecutor.shutdownNow(); } - private void doPerSecondUpdates() { - updateNetworkKilobytesReadLastSecond(); - updateOpsPerformedOverLastSecond(); + private void doRollingAverageUpdates() { + networkKiloBytesReadOverLastSecond = networkBytesReadRollingAverageStat.calculate() / 1024.0; + opsPerformedOverLastSecond = opsPerformedRollingAverageStat.calculate(); } - private void updateNetworkKilobytesReadLastSecond() { - final long totalNetworkBytesRead = getTotalNetworkBytesRead(); - long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead; - networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0; - previousNetworkBytesRead = totalNetworkBytesRead; - } + private static class RollingAverageStat { + private int tickNumber = 0; + private long valueReadLastTick; + private final long[] valuesReadOverLastNSamples; + private final Callable statCallable; + + private RollingAverageStat(int samplesPerSecond, Callable getCurrentValue) { + valuesReadOverLastNSamples = new long[samplesPerSecond]; + statCallable = getCurrentValue; + } - private void updateOpsPerformedOverLastSecond() { - final long totalOpsPerformed = getCommandsProcessed(); - opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick; - opsPerformedLastTick = totalOpsPerformed; + long calculate() { + long currentValue; + try { + currentValue = statCallable.call(); + } catch (Exception e) { + throw new RedisException("Error while calculating RollingAverage stats", e); + } + long delta = currentValue - valueReadLastTick; + valueReadLastTick = currentValue; + valuesReadOverLastNSamples[tickNumber] = delta; + tickNumber++; + if (tickNumber >= valuesReadOverLastNSamples.length) { + tickNumber = 0; + } + return Arrays.stream(valuesReadOverLastNSamples).sum(); + } } }