diff --git a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java index c60a43e20b0b..6e831f3565ac 100644 --- a/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java +++ b/geode-for-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsNativeRedisAcceptanceTest.java @@ -20,7 +20,7 @@ import org.apache.geode.NativeRedisTestRule; -public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest { +public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest { @ClassRule public static NativeRedisTestRule redis = new NativeRedisTestRule(); diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java similarity index 61% rename from geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java rename to geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java index e72709b3708c..f32c6c06c2ef 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractInfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractRedisInfoStatsIntegrationTest.java @@ -15,25 +15,17 @@ package org.apache.geode.redis.internal.commands.executor.server; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; -import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS; -import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.offset; -import static org.assertj.core.api.Assertions.withinPercentage; import java.time.Duration; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import redis.clients.jedis.Jedis; @@ -42,19 +34,17 @@ import org.apache.geode.redis.RedisIntegrationTest; import org.apache.geode.redis.RedisTestHelper; import org.apache.geode.test.awaitility.GeodeAwaitility; -import org.apache.geode.test.junit.rules.ExecutorServiceRule; -public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest { - @Rule - public ExecutorServiceRule executor = new ExecutorServiceRule(); +public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest { + private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); private static final String EXISTING_HASH_KEY = "Existing_Hash"; private static final String EXISTING_STRING_KEY = "Existing_String"; private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; private Jedis jedis; - private static long startTime; + private static long START_TIME; private static StatisticsClock statisticsClock; private long preTestConnectionsReceived = 0; @@ -83,12 +73,12 @@ public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrati @BeforeClass public static void beforeClass() { statisticsClock = new EnabledStatisticsClock(); - startTime = statisticsClock.getTime(); + START_TIME = statisticsClock.getTime(); } @Before public void before() { - jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); + jedis = new Jedis("localhost", getPort(), TIMEOUT); numInfoCalled.set(0); long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED)); @@ -167,57 +157,27 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { } @Test - public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() - throws InterruptedException, ExecutionException, TimeoutException { - final long numberSecondsToRun = 4; - AtomicInteger totalOpsPerformed = new AtomicInteger(); - - final long startTime = System.currentTimeMillis(); - final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); - - // Take a sample in the middle of performing operations to help eliminate warmup as a factor - final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); - final long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis(); - - // Execute commands in the background - Future executeCommands = executor.submit(() -> { - Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); - while (System.currentTimeMillis() < endTime) { - jedis2.set("key", "value"); - totalOpsPerformed.getAndIncrement(); - } - jedis2.close(); + public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { + int NUMBER_SECONDS_TO_RUN = 10; + AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); + + await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { + jedis.set("key", "value"); + numberOfCommandsExecuted.getAndIncrement(); + return true; }); - - // Record the total number of operations performed a second before we plan to sample. A poll - // interval less than the default of 100ms is used to increase the accuracy of the expected - // value, as the stats update the value of instantaneous per second values every 62.5ms - await().pollInterval(Duration.ofMillis(10)) - .until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed); - final int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get(); - - // Calculate how many operations were performed in the last second. A poll interval less than - // the default of 100ms is used to increase the accuracy of the expected value, as the stats - // update the value of instantaneous per second values every 62.5ms - await().pollInterval(Duration.ofMillis(10)) - .until(() -> System.currentTimeMillis() >= timeToSampleAt); - - final double reportedCommandsPerLastSecond = + double reportedCommandsPerLastSecond = Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)); - final int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling; + long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN; - assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); + assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0)); - executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); + // if time passes w/o operations + await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); - // Wait two seconds with no operations - Thread.sleep(2000); - - // Confirm that instantaneous operations per second returns to zero when no operations are being - // performed, with a small offset to account for the info command being executed - assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0, - offset(1.0)); + assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))) + .isCloseTo(0.0, Offset.offset(1.0)); } @Test @@ -233,72 +193,39 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { } @Test - public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() - throws InterruptedException, ExecutionException, TimeoutException { - final int numberSecondsToRun = 4; - final String command = "set"; - final String key = "key"; - final String value = "value"; - final int bytesSentPerCommand = - ("*3\r\n$" + command.length() + "\r\n" + command + - "\r\n$" + key.length() + "\r\n" + key + - "\r\n$" + value.length() + "\r\n" + value + - "\r\n").length(); + public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { + + double REASONABLE_SOUNDING_OFFSET = .8; + int NUMBER_SECONDS_TO_RUN = 5; + String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; + int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); AtomicInteger totalBytesSent = new AtomicInteger(); - final long startTime = System.currentTimeMillis(); - final long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis(); - - // Take a sample in the middle of performing operations to help eliminate warmup as a factor - final long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis(); - final long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis(); - - // Execute commands in the background - Future executeCommands = executor.submit(() -> { - Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT); - while (System.currentTimeMillis() < endTime) { - jedis2.set(key, value); - totalBytesSent.addAndGet(bytesSentPerCommand); - } - jedis2.close(); + await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { + jedis.set("key", "value"); + totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); + return true; }); + double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); + double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000; - // Record the total number of KB sent a second before we plan to sample. A poll interval less - // than the default of 100ms is used to increase the accuracy of the expected value, as the - // stats update the value of instantaneous per second values every 62.5ms - await().pollInterval(Duration.ofMillis(10)) - .until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent); - final int bytesSentUntilASecondBeforeSampling = totalBytesSent.get(); - - // Calculate how many KB were sent in the last second. A poll interval less than the default of - // 100ms is used to increase the accuracy of the expected value, as the stats update the value - // of instantaneous per second values every 62.5ms - await().pollInterval(Duration.ofMillis(10)) - .until(() -> System.currentTimeMillis() >= timeToSampleAt); - - final double reportedKBReadPerLastSecond = - Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)); - - final double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0; - - assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(12.5)); - - executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS); + assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); - // Wait two seconds with no operations - Thread.sleep(2000); + // if time passes w/o operations + await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) + .until(() -> true); - // Confirm that instantaneous KB read per second returns to zero when no operations are being - // performed, with a small offset to account for the info command being executed - assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) - .isCloseTo(0, offset(0.02)); + // Kb/s should eventually drop to 0 or at least very close since just executing the info + // command may result in the value increasing. + assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))) + .isCloseTo(0.0, Offset.offset(0.1)); } // ------------------- Clients Section -------------------------- // @Test public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { - Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); jedis2.ping(); validateConnectedClients(jedis, preTestConnectedClients, 1); @@ -310,7 +237,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { @Test public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { - Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT); + Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); jedis2.ping(); validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); @@ -349,7 +276,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { // ------------------- Helper Methods ----------------------------- // public long getStartTime() { - return startTime; + return START_TIME; } public long getCurrentTime() { diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java index 675c48dca1ae..dd34917a87cb 100644 --- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java +++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/InfoStatsIntegrationTest.java @@ -20,7 +20,7 @@ import org.apache.geode.redis.GeodeRedisServerRule; -public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest { +public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest { @ClassRule public static GeodeRedisServerRule server = new GeodeRedisServerRule(); @@ -35,5 +35,7 @@ public int getExposedPort() { } @Override - public void configureMaxMemory(Jedis jedis) {} + public void configureMaxMemory(Jedis jedis) { + return; + } } diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java index 91b8e7f0f980..35d955739ad9 100644 --- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java +++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java @@ -16,18 +16,15 @@ package org.apache.geode.redis.internal.statistics; -import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime; import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; -import java.util.Arrays; -import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.geode.internal.statistics.StatisticsClock; -import org.apache.geode.redis.internal.RedisException; import org.apache.geode.redis.internal.commands.RedisCommandType; public class RedisStats { @@ -40,25 +37,23 @@ public class RedisStats { private final AtomicLong keyspaceMisses = new AtomicLong(); private final AtomicLong uniqueChannelSubscriptions = new AtomicLong(); private final AtomicLong uniquePatternSubscriptions = new AtomicLong(); - - private final ScheduledExecutorService rollingAverageExecutor; - private static final int ROLLING_AVERAGE_SAMPLES_PER_SECOND = 16; - private final RollingAverageStat networkBytesReadRollingAverageStat = - new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getTotalNetworkBytesRead); + private final ScheduledExecutorService perSecondExecutor; private volatile double networkKiloBytesReadOverLastSecond; - private final RollingAverageStat opsPerformedRollingAverageStat = - new RollingAverageStat(ROLLING_AVERAGE_SAMPLES_PER_SECOND, this::getCommandsProcessed); - private volatile double opsPerformedOverLastSecond; - + private volatile long opsPerformedLastTick; + private double opsPerformedOverLastSecond; + private long previousNetworkBytesRead; private final StatisticsClock clock; private final GeodeRedisStats geodeRedisStats; - private final long startTimeInNanos; + private final long START_TIME_IN_NANOS; + + + public RedisStats(StatisticsClock clock, + GeodeRedisStats geodeRedisStats) { - public RedisStats(StatisticsClock clock, GeodeRedisStats geodeRedisStats) { this.clock = clock; this.geodeRedisStats = geodeRedisStats; - rollingAverageExecutor = startRollingAverageUpdater(); - startTimeInNanos = clock.getTime(); + perSecondExecutor = startPerSecondUpdater(); + START_TIME_IN_NANOS = clock.getTime(); } public void incCommandsProcessed() { @@ -119,7 +114,7 @@ public long getCurrentTimeNanos() { } public long getUptimeInMilliseconds() { - long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos; + long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS; return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos); } @@ -198,57 +193,43 @@ public void changeUniquePatternSubscriptions(long delta) { public void close() { geodeRedisStats.close(); - stopRollingAverageUpdater(); + stopPerSecondUpdater(); } - private ScheduledExecutorService startRollingAverageUpdater() { - long microsPerSecond = 1_000_000; - final long delayMicros = microsPerSecond / ROLLING_AVERAGE_SAMPLES_PER_SECOND; + private ScheduledExecutorService startPerSecondUpdater() { + int INTERVAL = 1; - ScheduledExecutorService rollingAverageExecutor = - newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-"); + ScheduledExecutorService perSecondExecutor = + newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-"); - rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros, - delayMicros, MICROSECONDS); + perSecondExecutor.scheduleWithFixedDelay( + this::doPerSecondUpdates, + INTERVAL, + INTERVAL, + SECONDS); - return rollingAverageExecutor; + return perSecondExecutor; } - private void stopRollingAverageUpdater() { - rollingAverageExecutor.shutdownNow(); + private void stopPerSecondUpdater() { + perSecondExecutor.shutdownNow(); } - private void doRollingAverageUpdates() { - networkKiloBytesReadOverLastSecond = networkBytesReadRollingAverageStat.calculate() / 1024.0; - opsPerformedOverLastSecond = opsPerformedRollingAverageStat.calculate(); + private void doPerSecondUpdates() { + updateNetworkKilobytesReadLastSecond(); + updateOpsPerformedOverLastSecond(); } - private static class RollingAverageStat { - private int tickNumber = 0; - private long valueReadLastTick; - private final long[] valuesReadOverLastNSamples; - private final Callable statCallable; - - private RollingAverageStat(int samplesPerSecond, Callable getCurrentValue) { - valuesReadOverLastNSamples = new long[samplesPerSecond]; - statCallable = getCurrentValue; - } + private void updateNetworkKilobytesReadLastSecond() { + final long totalNetworkBytesRead = getTotalNetworkBytesRead(); + long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead; + networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0; + previousNetworkBytesRead = totalNetworkBytesRead; + } - long calculate() { - long currentValue; - try { - currentValue = statCallable.call(); - } catch (Exception e) { - throw new RedisException("Error while calculating RollingAverage stats", e); - } - long delta = currentValue - valueReadLastTick; - valueReadLastTick = currentValue; - valuesReadOverLastNSamples[tickNumber] = delta; - tickNumber++; - if (tickNumber >= valuesReadOverLastNSamples.length) { - tickNumber = 0; - } - return Arrays.stream(valuesReadOverLastNSamples).sum(); - } + private void updateOpsPerformedOverLastSecond() { + final long totalOpsPerformed = getCommandsProcessed(); + opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick; + opsPerformedLastTick = totalOpsPerformed; } }