Skip to content

Commit

Permalink
MINOR: Fix rate metric spikes (apache#15889)
Browse files Browse the repository at this point in the history
Rate reports value in the form of sumOrCount/monitoredWindowSize. It has a bug in monitoredWindowSize calculation, which leads to spikes in result values.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
emitskevich-blp authored and wernerdv committed Jun 3, 2024
1 parent db90e63 commit 962f771
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
public class MetricConfig {

public static final int DEFAULT_NUM_SAMPLES = 2;

private Quota quota;
private int samples;
private long eventWindow;
Expand All @@ -34,7 +36,7 @@ public class MetricConfig {

public MetricConfig() {
this.quota = null;
this.samples = 2;
this.samples = DEFAULT_NUM_SAMPLES;
this.eventWindow = Long.MAX_VALUE;
this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
this.tags = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ public long windowSize(MetricConfig config, long now) {
/*
* Here we check the total amount of time elapsed since the oldest non-obsolete window.
* This give the total windowSize of the batch which is the time used for Rate computation.
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second
* window, the measured rate will be very high.
* Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
* Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
*
* Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
*
* Note also, that totalElapsedTimeMs can be larger than the monitored window size,
* if the oldest sample started before the window while overlapping it.
*/
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public abstract class SampledStat implements MeasurableStat {

public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<>(2);
// keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
this.samples = new ArrayList<>(MetricConfig.DEFAULT_NUM_SAMPLES + 1);
}

@Override
Expand All @@ -50,10 +51,13 @@ public void record(MetricConfig config, double value, long timeMs) {
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
sample.lastEventMs = timeMs;
}

private Sample advance(MetricConfig config, long timeMs) {
this.current = (this.current + 1) % config.samples();
// keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
int maxSamples = config.samples() + 1;
this.current = (this.current + 1) % maxSamples;
if (this.current >= samples.size()) {
Sample sample = newSample(timeMs);
this.samples.add(sample);
Expand Down Expand Up @@ -87,7 +91,7 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
if (curr.lastWindowMs < oldest.lastWindowMs)
if (curr.startTimeMs < oldest.startTimeMs)
oldest = curr;
}
return oldest;
Expand All @@ -106,44 +110,51 @@ public String toString() {

public abstract double combine(List<Sample> samples, MetricConfig config, long now);

/* Timeout any windows that have expired in the absence of any events */
// purge any samples that lack observed events within the monitored window
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
if (now - sample.lastWindowMs >= expireAge)
// samples overlapping the monitored window are kept,
// even if they started before it
if (now - sample.lastEventMs >= expireAge) {
sample.reset(now);
}
}
}

protected static class Sample {
public double initialValue;
public long eventCount;
public long lastWindowMs;
public long startTimeMs;
public long lastEventMs;
public double value;

public Sample(double initialValue, long now) {
this.initialValue = initialValue;
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public void reset(long now) {
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
}

@Override
public String toString() {
return "Sample(" +
"value=" + value +
", eventCount=" + eventCount +
", lastWindowMs=" + lastWindowMs +
", startTimeMs=" + startTimeMs +
", lastEventMs=" + lastEventMs +
", initialValue=" + initialValue +
')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SimpleRate extends Rate {
@Override
public long windowSize(MetricConfig config, long now) {
stat.purgeObsoleteSamples(config, now);
long elapsed = now - stat.oldest(now).lastWindowMs;
long elapsed = now - stat.oldest(now).startTimeMs;
return Math.max(elapsed, config.timeWindowMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,17 +336,6 @@ public void testRemoveMetric() {
assertEquals(size, metrics.metrics().size());
}

@Test
public void testEventWindowing() {
WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
count.record(config, 1.0, time.milliseconds());
count.record(config, 1.0, time.milliseconds());
assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
count.record(config, 1.0, time.milliseconds()); // first event times out
assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
}

@Test
public void testTimeWindowing() {
WindowedCount count = new WindowedCount();
Expand Down Expand Up @@ -475,28 +464,13 @@ public void testPercentiles() {
Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1"));

// record two windows worth of sequential values
for (int i = 0; i < buckets; i++)
sensor.record(i);

assertEquals(25, (Double) p25.metricValue(), 1.0);
assertEquals(50, (Double) p50.metricValue(), 1.0);
assertEquals(75, (Double) p75.metricValue(), 1.0);

for (int i = 0; i < buckets; i++)
sensor.record(0.0);

assertEquals(0.0, (Double) p25.metricValue(), 1.0);
assertEquals(0.0, (Double) p50.metricValue(), 1.0);
assertEquals(0.0, (Double) p75.metricValue(), 1.0);

// record two more windows worth of sequential values
// record 100 sequential values
for (int i = 0; i < buckets; i++)
sensor.record(i);

assertEquals(25, (Double) p25.metricValue(), 1.0);
assertEquals(50, (Double) p50.metricValue(), 1.0);
assertEquals(75, (Double) p75.metricValue(), 1.0);
assertEquals(25, (Double) p25.metricValue());
assertEquals(50, (Double) p50.metricValue());
assertEquals(75, (Double) p75.metricValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.metrics.JmxReporter;
Expand Down Expand Up @@ -73,14 +72,14 @@ public void testMoreFrequencyParametersThanBuckets() {
}

@Test
public void testBooleanFrequencies() {
MetricName metricTrue = name("true");
MetricName metricFalse = name("false");
public void testBooleanFrequenciesStrategy1() {
MetricName metricTrue = metricName("true");
MetricName metricFalse = metricName("false");
Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
final NamedMeasurable falseMetric = frequencies.stats().get(0);
final NamedMeasurable trueMetric = frequencies.stats().get(1);

// Record 2 windows worth of values
// Record 25 "false" and 75 "true"
for (int i = 0; i != 25; ++i) {
frequencies.record(config, 0.0, time.milliseconds());
}
Expand All @@ -89,8 +88,17 @@ public void testBooleanFrequencies() {
}
assertEquals(0.25, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
assertEquals(0.75, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
}

@Test
public void testBooleanFrequenciesStrategy2() {
MetricName metricTrue = metricName("true");
MetricName metricFalse = metricName("false");
Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
final NamedMeasurable falseMetric = frequencies.stats().get(0);
final NamedMeasurable trueMetric = frequencies.stats().get(1);

// Record 2 more windows worth of values
// Record 40 "false" and 60 "true"
for (int i = 0; i != 40; ++i) {
frequencies.record(config, 0.0, time.milliseconds());
}
Expand All @@ -102,58 +110,69 @@ public void testBooleanFrequencies() {
}

@Test
public void testUseWithMetrics() {
MetricName name1 = name("1");
MetricName name2 = name("2");
MetricName name3 = name("3");
MetricName name4 = name("4");
Frequencies frequencies = new Frequencies(4, 1.0, 4.0,
new Frequency(name1, 1.0),
new Frequency(name2, 2.0),
new Frequency(name3, 3.0),
new Frequency(name4, 4.0));
public void testWithMetricsStrategy1() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);
Metric metric1 = this.metrics.metrics().get(name1);
Metric metric2 = this.metrics.metrics().get(name2);
Metric metric3 = this.metrics.metrics().get(name3);
Metric metric4 = this.metrics.metrics().get(name4);

// Record 2 windows worth of values
for (int i = 0; i != 100; ++i) {
// Record 100 events uniformly between all buckets
for (int i = 0; i < 100; ++i) {
frequencies.record(config, i % 4 + 1, time.milliseconds());
}
assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
assertEquals(0.25, (Double) metric3.metricValue(), DELTA);
assertEquals(0.25, (Double) metric4.metricValue(), DELTA);
assertEquals(0.25, metricValue("1"), DELTA);
assertEquals(0.25, metricValue("2"), DELTA);
assertEquals(0.25, metricValue("3"), DELTA);
assertEquals(0.25, metricValue("4"), DELTA);
}

@Test
public void testWithMetricsStrategy2() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);

// Record 100 events half-half between 1st and 2nd buckets
for (int i = 0; i < 100; ++i) {
frequencies.record(config, i % 2 + 1, time.milliseconds());
}
assertEquals(0.50, metricValue("1"), DELTA);
assertEquals(0.50, metricValue("2"), DELTA);
assertEquals(0.00, metricValue("3"), DELTA);
assertEquals(0.00, metricValue("4"), DELTA);
}

// Record 2 windows worth of values
for (int i = 0; i != 100; ++i) {
@Test
public void testWithMetricsStrategy3() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);

// Record 50 events half-half between 1st and 2nd buckets
for (int i = 0; i < 50; ++i) {
frequencies.record(config, i % 2 + 1, time.milliseconds());
}
assertEquals(0.50, (Double) metric1.metricValue(), DELTA);
assertEquals(0.50, (Double) metric2.metricValue(), DELTA);
assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
assertEquals(0.00, (Double) metric4.metricValue(), DELTA);

// Record 1 window worth of values to overlap with the last window
// that is half 1.0 and half 2.0
for (int i = 0; i != 50; ++i) {
// Record 50 events to 4th bucket
for (int i = 0; i < 50; ++i) {
frequencies.record(config, 4.0, time.milliseconds());
}
assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
assertEquals(0.50, (Double) metric4.metricValue(), DELTA);
assertEquals(0.25, metricValue("1"), DELTA);
assertEquals(0.25, metricValue("2"), DELTA);
assertEquals(0.00, metricValue("3"), DELTA);
assertEquals(0.50, metricValue("4"), DELTA);
}

protected MetricName name(String metricName) {
return new MetricName(metricName, "group-id", "desc", Collections.emptyMap());
private MetricName metricName(String name) {
return new MetricName(name, "group-id", "desc", Collections.emptyMap());
}

protected Frequency freq(String name, double value) {
return new Frequency(name(name), value);
private Frequency freq(String name, double value) {
return new Frequency(metricName(name), value);
}

private double metricValue(String name) {
return (double) metrics.metrics().get(metricName(name)).metricValue();
}
}
Loading

0 comments on commit 962f771

Please sign in to comment.