Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix rate metric spikes #15889

Merged
merged 21 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
public class MetricConfig {

public static final int DEFAULT_NUM_SAMPLES = 2;

private Quota quota;
private int samples;
private long eventWindow;
Expand All @@ -34,7 +36,7 @@ public class MetricConfig {

public MetricConfig() {
this.quota = null;
this.samples = 2;
this.samples = DEFAULT_NUM_SAMPLES;
this.eventWindow = Long.MAX_VALUE;
this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
this.tags = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ public long windowSize(MetricConfig config, long now) {
/*
* Here we check the total amount of time elapsed since the oldest non-obsolete window.
* This give the total windowSize of the batch which is the time used for Rate computation.
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second
* window, the measured rate will be very high.
* Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
* Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
*
* Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
*
* Note also, that totalElapsedTimeMs can be larger than the monitored window size,
* if the oldest sample started before the window while overlapping it.
*/
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public abstract class SampledStat implements MeasurableStat {

public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<>(2);
// keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
this.samples = new ArrayList<>(MetricConfig.DEFAULT_NUM_SAMPLES + 1);
}

@Override
Expand All @@ -50,10 +51,13 @@ public void record(MetricConfig config, double value, long timeMs) {
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
sample.lastEventMs = timeMs;
}

private Sample advance(MetricConfig config, long timeMs) {
this.current = (this.current + 1) % config.samples();
// keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
int maxSamples = config.samples() + 1;
this.current = (this.current + 1) % maxSamples;
if (this.current >= samples.size()) {
Sample sample = newSample(timeMs);
this.samples.add(sample);
Expand Down Expand Up @@ -87,7 +91,7 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
if (curr.lastWindowMs < oldest.lastWindowMs)
if (curr.startTimeMs < oldest.startTimeMs)
oldest = curr;
}
return oldest;
Expand All @@ -106,44 +110,51 @@ public String toString() {

public abstract double combine(List<Sample> samples, MetricConfig config, long now);

/* Timeout any windows that have expired in the absence of any events */
// purge any samples that lack observed events within the monitored window
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
if (now - sample.lastWindowMs >= expireAge)
// samples overlapping the monitored window are kept,
// even if they started before it
if (now - sample.lastEventMs >= expireAge) {
sample.reset(now);
}
}
}

protected static class Sample {
public double initialValue;
public long eventCount;
public long lastWindowMs;
public long startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
public long lastEventMs;
public double value;

public Sample(double initialValue, long now) {
this.initialValue = initialValue;
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public void reset(long now) {
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString() {
return "Sample(" +
"value=" + value +
", eventCount=" + eventCount +
", lastWindowMs=" + lastWindowMs +
", startTimeMs=" + startTimeMs +
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
", lastEventMs=" + lastEventMs +
", initialValue=" + initialValue +
')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SimpleRate extends Rate {
@Override
public long windowSize(MetricConfig config, long now) {
stat.purgeObsoleteSamples(config, now);
long elapsed = now - stat.oldest(now).lastWindowMs;
long elapsed = now - stat.oldest(now).startTimeMs;
return Math.max(elapsed, config.timeWindowMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,17 +336,6 @@ public void testRemoveMetric() {
assertEquals(size, metrics.metrics().size());
}

@Test
public void testEventWindowing() {
WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
count.record(config, 1.0, time.milliseconds());
count.record(config, 1.0, time.milliseconds());
assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
count.record(config, 1.0, time.milliseconds()); // first event times out
assertEquals(2.0, count.measure(config, time.milliseconds()), EPS);
}

@Test
public void testTimeWindowing() {
WindowedCount count = new WindowedCount();
Expand Down Expand Up @@ -475,28 +464,13 @@ public void testPercentiles() {
Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1"));
Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1"));

// record two windows worth of sequential values
for (int i = 0; i < buckets; i++)
// record 1-100 sequential values
for (int i = 0; i < buckets; i++) {
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
sensor.record(i);

assertEquals(25, (Double) p25.metricValue(), 1.0);
assertEquals(50, (Double) p50.metricValue(), 1.0);
assertEquals(75, (Double) p75.metricValue(), 1.0);

for (int i = 0; i < buckets; i++)
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
sensor.record(0.0);

assertEquals(0.0, (Double) p25.metricValue(), 1.0);
assertEquals(0.0, (Double) p50.metricValue(), 1.0);
assertEquals(0.0, (Double) p75.metricValue(), 1.0);

// record two more windows worth of sequential values
for (int i = 0; i < buckets; i++)
sensor.record(i);

assertEquals(25, (Double) p25.metricValue(), 1.0);
assertEquals(50, (Double) p50.metricValue(), 1.0);
assertEquals(75, (Double) p75.metricValue(), 1.0);
}
assertEquals(25, (Double) p25.metricValue());
assertEquals(50, (Double) p50.metricValue());
assertEquals(75, (Double) p75.metricValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.metrics.JmxReporter;
Expand Down Expand Up @@ -73,14 +72,14 @@ public void testMoreFrequencyParametersThanBuckets() {
}

@Test
public void testBooleanFrequencies() {
public void testBooleanFrequenciesStrategy1() {
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
MetricName metricTrue = name("true");
MetricName metricFalse = name("false");
Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
final NamedMeasurable falseMetric = frequencies.stats().get(0);
final NamedMeasurable trueMetric = frequencies.stats().get(1);

// Record 2 windows worth of values
// Record 25 "false" and 75 "true"
for (int i = 0; i != 25; ++i) {
frequencies.record(config, 0.0, time.milliseconds());
}
Expand All @@ -89,8 +88,17 @@ public void testBooleanFrequencies() {
}
assertEquals(0.25, falseMetric.stat().measure(config, time.milliseconds()), DELTA);
assertEquals(0.75, trueMetric.stat().measure(config, time.milliseconds()), DELTA);
}

// Record 2 more windows worth of values
@Test
public void testBooleanFrequenciesStrategy2() {
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
MetricName metricTrue = name("true");
MetricName metricFalse = name("false");
Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue);
final NamedMeasurable falseMetric = frequencies.stats().get(0);
final NamedMeasurable trueMetric = frequencies.stats().get(1);

// Record 40 "false" and 60 "true"
for (int i = 0; i != 40; ++i) {
frequencies.record(config, 0.0, time.milliseconds());
}
Expand All @@ -102,58 +110,69 @@ public void testBooleanFrequencies() {
}

@Test
public void testUseWithMetrics() {
MetricName name1 = name("1");
MetricName name2 = name("2");
MetricName name3 = name("3");
MetricName name4 = name("4");
Frequencies frequencies = new Frequencies(4, 1.0, 4.0,
new Frequency(name1, 1.0),
new Frequency(name2, 2.0),
new Frequency(name3, 3.0),
new Frequency(name4, 4.0));
public void testWithMetricsStrategy1() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);
Metric metric1 = this.metrics.metrics().get(name1);
Metric metric2 = this.metrics.metrics().get(name2);
Metric metric3 = this.metrics.metrics().get(name3);
Metric metric4 = this.metrics.metrics().get(name4);

// Record 2 windows worth of values
for (int i = 0; i != 100; ++i) {
// Record 100 events uniformly between all buckets
for (int i = 0; i < 100; ++i) {
frequencies.record(config, i % 4 + 1, time.milliseconds());
}
assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
assertEquals(0.25, (Double) metric3.metricValue(), DELTA);
assertEquals(0.25, (Double) metric4.metricValue(), DELTA);
assertEquals(0.25, metricValueFor("1"), DELTA);
assertEquals(0.25, metricValueFor("2"), DELTA);
assertEquals(0.25, metricValueFor("3"), DELTA);
assertEquals(0.25, metricValueFor("4"), DELTA);
}

@Test
public void testWithMetricsStrategy2() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);

// Record 100 events half-half between 1st and 2nd buckets
for (int i = 0; i < 100; ++i) {
frequencies.record(config, i % 2 + 1, time.milliseconds());
}
assertEquals(0.50, metricValueFor("1"), DELTA);
assertEquals(0.50, metricValueFor("2"), DELTA);
assertEquals(0.00, metricValueFor("3"), DELTA);
assertEquals(0.00, metricValueFor("4"), DELTA);
}

// Record 2 windows worth of values
for (int i = 0; i != 100; ++i) {
@Test
public void testWithMetricsStrategy3() {
Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0),
freq("2", 2.0), freq("3", 3.0), freq("4", 4.0));
Sensor sensor = metrics.sensor("test", config);
sensor.add(frequencies);

// Record 50 events half-half between 1st and 2nd buckets
for (int i = 0; i < 50; ++i) {
frequencies.record(config, i % 2 + 1, time.milliseconds());
}
assertEquals(0.50, (Double) metric1.metricValue(), DELTA);
assertEquals(0.50, (Double) metric2.metricValue(), DELTA);
assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
assertEquals(0.00, (Double) metric4.metricValue(), DELTA);

// Record 1 window worth of values to overlap with the last window
// that is half 1.0 and half 2.0
for (int i = 0; i != 50; ++i) {
// Record 50 events to 4th bucket
for (int i = 0; i < 50; ++i) {
frequencies.record(config, 4.0, time.milliseconds());
}
assertEquals(0.25, (Double) metric1.metricValue(), DELTA);
assertEquals(0.25, (Double) metric2.metricValue(), DELTA);
assertEquals(0.00, (Double) metric3.metricValue(), DELTA);
assertEquals(0.50, (Double) metric4.metricValue(), DELTA);
assertEquals(0.25, metricValueFor("1"), DELTA);
assertEquals(0.25, metricValueFor("2"), DELTA);
assertEquals(0.00, metricValueFor("3"), DELTA);
assertEquals(0.50, metricValueFor("4"), DELTA);
}

protected MetricName name(String metricName) {
private MetricName name(String metricName) {
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
return new MetricName(metricName, "group-id", "desc", Collections.emptyMap());
}

protected Frequency freq(String name, double value) {
private Frequency freq(String name, double value) {
return new Frequency(name(name), value);
}

private double metricValueFor(String name) {
return (double) metrics.metrics().get(name(name)).metricValue();
}
}
Loading