Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix rate metric spikes #15889

Merged
merged 21 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public long windowSize(MetricConfig config, long now) {
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
*/
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void record(MetricConfig config, double value, long timeMs) {
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
sample.lastEventMs = timeMs;
}

private Sample advance(MetricConfig config, long timeMs) {
Expand Down Expand Up @@ -87,7 +88,7 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
if (curr.lastWindowMs < oldest.lastWindowMs)
if (curr.startTimeMs < oldest.startTimeMs)
oldest = curr;
}
return oldest;
Expand All @@ -110,40 +111,43 @@ public String toString() {
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
if (now - sample.lastWindowMs >= expireAge)
if (now - sample.lastEventMs >= expireAge)
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
sample.reset(now);
}
}

protected static class Sample {
public double initialValue;
public long eventCount;
public long lastWindowMs;
public long startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
public long lastEventMs;
public double value;

public Sample(double initialValue, long now) {
this.initialValue = initialValue;
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public void reset(long now) {
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString() {
return "Sample(" +
"value=" + value +
", eventCount=" + eventCount +
", lastWindowMs=" + lastWindowMs +
", startTimeMs=" + startTimeMs +
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
", initialValue=" + initialValue +
')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SimpleRate extends Rate {
@Override
public long windowSize(MetricConfig config, long now) {
stat.purgeObsoleteSamples(config, now);
long elapsed = now - stat.oldest(now).lastWindowMs;
long elapsed = now - stat.oldest(now).startTimeMs;
return Math.max(elapsed, config.timeWindowMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RateTest {
private static final double EPS = 0.000001;
private Rate r;
private Time timeClock;
private Rate rate;
private Time time;

@BeforeEach
public void setup() {
r = new Rate();
timeClock = new MockTime();
rate = new Rate();
time = new MockTime();
}

// Tests the scenario where the recording and measurement is done before the window for first sample finishes
Expand All @@ -48,12 +53,12 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS
final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
final double sampleValue = 50.0;
// record at beginning of the window
r.record(config, sampleValue, timeClock.milliseconds());
rate.record(config, sampleValue, time.milliseconds());
// forward time till almost the end of window
final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
timeClock.sleep(measurementTime);
time.sleep(measurementTime);
// calculate rate at almost the end of window
final double observedRate = r.measure(config, timeClock.milliseconds());
final double observedRate = rate.measure(config, time.milliseconds());
assertFalse(Double.isNaN(observedRate));

// In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
Expand All @@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS
double expectedRatePerSec = sampleValue / windowSize;
assertEquals(expectedRatePerSec, observedRate, EPS);
}

// Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained
// window control. The expected rate, hence, is 10-11 events/sec depending on the moment of
// measurement. Start assertions from the second window.
@Test
public void testRateIsConsistentAfterTheFirstWindow() {
MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2);
List<Integer> steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100);

// start the first window and record events at 0,99,199,...,999 ms
for (int stepMs : steps) {
time.sleep(stepMs);
rate.record(config, 1, time.milliseconds());
}

// making a gap of 100 ms between windows
time.sleep(101);

// start the second window and record events at 0,99,199,...,999 ms
for (int stepMs : steps) {
time.sleep(stepMs);
rate.record(config, 1, time.milliseconds());
double observedRate = rate.measure(config, time.milliseconds());
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(10 <= observedRate && observedRate <= 11);
}
}
}