Skip to content

Commit

Permalink
GEODE-10010: Calculate per-second Redis stats as rolling average
Browse files Browse the repository at this point in the history
 - Rather than a value that updates once per second, instantaneous
 operations per second and instantaneous kilobytes read per second now
 return a rolling average of those stats updated 16 times per second
 - Rename AbstractRedisInfoStatsIntegrationTest to match child classes
 - Rather than measuring instantaneous per second stats after
 operations have finished, sample them while operations are ongoing and
 calculate the expected value based on the number of operations
 performed in the one second prior to sampling.

Authored-by: Donal Evans <[email protected]>
  • Loading branch information
DonalEvans committed Feb 11, 2022
1 parent fbe4e66 commit 35e4494
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.NativeRedisTestRule;

public class InfoStatsNativeRedisAcceptanceTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsNativeRedisAcceptanceTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static NativeRedisTestRule redis = new NativeRedisTestRule();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@
package org.apache.geode.redis.internal.commands.executor.server;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.offset;
import static org.assertj.core.api.Assertions.withinPercentage;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.data.Offset;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;

Expand All @@ -34,17 +42,19 @@
import org.apache.geode.redis.RedisIntegrationTest;
import org.apache.geode.redis.RedisTestHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisIntegrationTest {
public abstract class AbstractInfoStatsIntegrationTest implements RedisIntegrationTest {
@Rule
public ExecutorServiceRule executor = new ExecutorServiceRule();

private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis();
private static final String EXISTING_HASH_KEY = "Existing_Hash";
private static final String EXISTING_STRING_KEY = "Existing_String";
private static final String EXISTING_SET_KEY_1 = "Existing_Set_1";
private static final String EXISTING_SET_KEY_2 = "Existing_Set_2";

private Jedis jedis;
private static long START_TIME;
private static long startTime;
private static StatisticsClock statisticsClock;

private long preTestConnectionsReceived = 0;
Expand Down Expand Up @@ -73,12 +83,12 @@ public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisInte
@BeforeClass
public static void beforeClass() {
statisticsClock = new EnabledStatisticsClock();
START_TIME = statisticsClock.getTime();
startTime = statisticsClock.getTime();
}

@Before
public void before() {
jedis = new Jedis("localhost", getPort(), TIMEOUT);
jedis = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
numInfoCalled.set(0);

long preSetupCommandsProcessed = Long.parseLong(getInfo(jedis).get(COMMANDS_PROCESSED));
Expand Down Expand Up @@ -157,27 +167,56 @@ public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() {
}

@Test
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() {
int NUMBER_SECONDS_TO_RUN = 10;
AtomicInteger numberOfCommandsExecuted = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
numberOfCommandsExecuted.getAndIncrement();
return true;
public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring()
throws InterruptedException, ExecutionException, TimeoutException {
long numberSecondsToRun = 4;
AtomicInteger totalOpsPerformed = new AtomicInteger();

long startTime = System.currentTimeMillis();
long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
long timeToGetBaselineOpsPerformed = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set("key", "value");
totalOpsPerformed.getAndIncrement();
}
jedis2.close();
});

// Record the total number of operations performed a second before we plan to sample. A poll
// interval less than the default of 100ms is used to increase the accuracy of the expected
// value, as the stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(50))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineOpsPerformed);
int opsPerformedUntilASecondBeforeSampling = totalOpsPerformed.get();

// Calculate how many operations were performed in the last second. A poll interval less than
// the default of 100ms is used to increase the accuracy of the expected value, as the stats
// update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(50))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);
int expected = totalOpsPerformed.get() - opsPerformedUntilASecondBeforeSampling;

double reportedCommandsPerLastSecond =
Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND));

long expected = numberOfCommandsExecuted.get() / NUMBER_SECONDS_TO_RUN;
assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, withinPercentage(10));

assertThat(reportedCommandsPerLastSecond).isCloseTo(expected, Offset.offset(4.0));
executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true);
// Wait two seconds with no operations
Thread.sleep(2000);

assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(1.0));
// Confirm that instantaneous operations per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isCloseTo(0,
offset(1.0));
}

@Test
Expand All @@ -193,39 +232,71 @@ public void networkBytesRead_shouldIncrementBySizeOfCommandSent() {
}

@Test
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() {

double REASONABLE_SOUNDING_OFFSET = .8;
int NUMBER_SECONDS_TO_RUN = 5;
String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n";
int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length();
public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond()
throws InterruptedException, ExecutionException, TimeoutException {
int numberSecondsToRun = 4;
String command = "set";
String key = "key";
String value = "value";
int bytesSentPerCommand =
("*3\r\n$" + command.length() + "\r\n" + command +
"\r\n$" + key.length() + "\r\n" + key +
"\r\n$" + value.length() + "\r\n" + value +
"\r\n").length();
AtomicInteger totalBytesSent = new AtomicInteger();

await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> {
jedis.set("key", "value");
totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND);
return true;
long startTime = System.currentTimeMillis();
long endTime = startTime + Duration.ofSeconds(numberSecondsToRun).toMillis();

// Take a sample in the middle of performing operations to help eliminate warmup as a factor
long timeToSampleAt = startTime + Duration.ofSeconds(numberSecondsToRun / 2).toMillis();
long timeToGetBaselineBytesSent = timeToSampleAt - Duration.ofSeconds(1).toMillis();

// Execute commands in the background
Future<Void> executeCommands = executor.submit(() -> {
Jedis jedis2 = new Jedis(BIND_ADDRESS, getPort(), REDIS_CLIENT_TIMEOUT);
while (System.currentTimeMillis() < endTime) {
jedis2.set(key, value);
totalBytesSent.addAndGet(bytesSentPerCommand);
}
jedis2.close();
});
double actual_kbs = Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));
double expected_kbs = ((double) totalBytesSent.get() / NUMBER_SECONDS_TO_RUN) / 1000;

assertThat(actual_kbs).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET));
// Record the total number of KB sent a second before we plan to sample. A poll interval less
// than the default of 100ms is used to increase the accuracy of the expected value, as the
// stats update the value of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(50))
.until(() -> System.currentTimeMillis() >= timeToGetBaselineBytesSent);
int bytesSentUntilASecondBeforeSampling = totalBytesSent.get();

// Calculate how many KB were sent in the last second. A poll interval less than the default of
// 100ms is used to increase the accuracy of the expected value, as the stats update the value
// of instantaneous per second values every 62.5ms
await().pollInterval(Duration.ofMillis(50))
.until(() -> System.currentTimeMillis() >= timeToSampleAt);
double expected = (totalBytesSent.get() - bytesSentUntilASecondBeforeSampling) / 1024.0;

double reportedKBReadPerLastSecond =
Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND));

assertThat(reportedKBReadPerLastSecond).isCloseTo(expected, withinPercentage(10));

executeCommands.get(GeodeAwaitility.getTimeout().toMillis(), TimeUnit.MILLISECONDS);

// if time passes w/o operations
await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS)
.until(() -> true);
// Wait two seconds with no operations
Thread.sleep(2000);

// Kb/s should eventually drop to 0 or at least very close since just executing the info
// command may result in the value increasing.
assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0.0, Offset.offset(0.1));
// Confirm that instantaneous KB read per second returns to zero when no operations are being
// performed, with a small offset to account for the info command being executed
assertThat(Double.parseDouble(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND)))
.isCloseTo(0, offset(0.02));
}

// ------------------- Clients Section -------------------------- //

@Test
public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectedClients(jedis, preTestConnectedClients, 1);
Expand All @@ -237,7 +308,7 @@ public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() {

@Test
public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() {
Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT);
Jedis jedis2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
jedis2.ping();

validateConnectionsReceived(jedis, preTestConnectionsReceived, 1);
Expand Down Expand Up @@ -276,7 +347,7 @@ public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() {

// ------------------- Helper Methods ----------------------------- //
public long getStartTime() {
return START_TIME;
return startTime;
}

public long getCurrentTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.geode.redis.GeodeRedisServerRule;

public class InfoStatsIntegrationTest extends AbstractRedisInfoStatsIntegrationTest {
public class InfoStatsIntegrationTest extends AbstractInfoStatsIntegrationTest {
@ClassRule
public static GeodeRedisServerRule server = new GeodeRedisServerRule();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.geode.redis.internal.statistics;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime;
import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;

import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -37,23 +38,29 @@ public class RedisStats {
private final AtomicLong keyspaceMisses = new AtomicLong();
private final AtomicLong uniqueChannelSubscriptions = new AtomicLong();
private final AtomicLong uniquePatternSubscriptions = new AtomicLong();
private final ScheduledExecutorService perSecondExecutor;

private final int rollingAverageSamplesPerSecond = 16;
private final ScheduledExecutorService rollingAverageExecutor;
private volatile double networkKiloBytesReadOverLastSecond;
private volatile long opsPerformedLastTick;
private final long[] networkBytesReadOverLastNSamples = new long[rollingAverageSamplesPerSecond];
private long totalNetworkBytesReadLastTick;
private double opsPerformedOverLastSecond;
private long previousNetworkBytesRead;
private volatile long totalOpsPerformedLastTick;
private final long[] opsPerformedOverLastNSamples = new long[rollingAverageSamplesPerSecond];
private int rollingAverageTick = 0;

private final StatisticsClock clock;
private final GeodeRedisStats geodeRedisStats;
private final long START_TIME_IN_NANOS;
private final long startTimeInNanos;


public RedisStats(StatisticsClock clock,
GeodeRedisStats geodeRedisStats) {

this.clock = clock;
this.geodeRedisStats = geodeRedisStats;
perSecondExecutor = startPerSecondUpdater();
START_TIME_IN_NANOS = clock.getTime();
rollingAverageExecutor = startRollingAverageUpdater();
startTimeInNanos = clock.getTime();
}

public void incCommandsProcessed() {
Expand Down Expand Up @@ -114,7 +121,7 @@ public long getCurrentTimeNanos() {
}

public long getUptimeInMilliseconds() {
long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS;
long uptimeInNanos = getCurrentTimeNanos() - startTimeInNanos;
return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos);
}

Expand Down Expand Up @@ -194,43 +201,49 @@ public void changeUniquePatternSubscriptions(long delta) {

public void close() {
geodeRedisStats.close();
stopPerSecondUpdater();
stopRollingAverageUpdater();
}

private ScheduledExecutorService startPerSecondUpdater() {
int INTERVAL = 1;
private ScheduledExecutorService startRollingAverageUpdater() {
long microsPerSecond = 1_000_000;
final long delayMicros = microsPerSecond / rollingAverageSamplesPerSecond;

ScheduledExecutorService perSecondExecutor =
newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-");
ScheduledExecutorService rollingAverageExecutor =
newSingleThreadScheduledExecutor("GemFireRedis-RollingAverageStatUpdater-");

perSecondExecutor.scheduleWithFixedDelay(
this::doPerSecondUpdates,
INTERVAL,
INTERVAL,
SECONDS);
rollingAverageExecutor.scheduleWithFixedDelay(this::doRollingAverageUpdates, delayMicros,
delayMicros, MICROSECONDS);

return perSecondExecutor;
return rollingAverageExecutor;
}

private void stopPerSecondUpdater() {
perSecondExecutor.shutdownNow();
private void stopRollingAverageUpdater() {
rollingAverageExecutor.shutdownNow();
}

private void doPerSecondUpdates() {
updateNetworkKilobytesReadLastSecond();
updateOpsPerformedOverLastSecond();
private void doRollingAverageUpdates() {
updateNetworkKilobytesReadLastSecond(rollingAverageTick);
updateOpsPerformedOverLastSecond(rollingAverageTick);
rollingAverageTick++;
if (rollingAverageTick >= rollingAverageSamplesPerSecond) {
rollingAverageTick = 0;
}
}

private void updateNetworkKilobytesReadLastSecond() {
private void updateNetworkKilobytesReadLastSecond(int tickNumber) {
final long totalNetworkBytesRead = getTotalNetworkBytesRead();
long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead;
networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1024.0;
previousNetworkBytesRead = totalNetworkBytesRead;
long deltaNetworkBytesRead = totalNetworkBytesRead - totalNetworkBytesReadLastTick;
networkBytesReadOverLastNSamples[tickNumber] = deltaNetworkBytesRead;
networkKiloBytesReadOverLastSecond =
Arrays.stream(networkBytesReadOverLastNSamples).sum() / 1024.0;
totalNetworkBytesReadLastTick = totalNetworkBytesRead;
}

private void updateOpsPerformedOverLastSecond() {
private void updateOpsPerformedOverLastSecond(int tickNumber) {
final long totalOpsPerformed = getCommandsProcessed();
opsPerformedOverLastSecond = totalOpsPerformed - opsPerformedLastTick;
opsPerformedLastTick = totalOpsPerformed;
long deltaOpsPerformed = totalOpsPerformed - totalOpsPerformedLastTick;
opsPerformedOverLastNSamples[tickNumber] = deltaOpsPerformed;
opsPerformedOverLastSecond = Arrays.stream(opsPerformedOverLastNSamples).sum();
totalOpsPerformedLastTick = totalOpsPerformed;
}
}

0 comments on commit 35e4494

Please sign in to comment.