Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix rate metric spikes #15889

Merged
merged 21 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ public long windowSize(MetricConfig config, long now) {
/*
* Here we check the total amount of time elapsed since the oldest non-obsolete window.
* This give the total windowSize of the batch which is the time used for Rate computation.
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
* However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second
* window, the measured rate will be very high.
* Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
* Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
*
* Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
*
* Note also, that totalElapsedTimeMs can be larger than the monitored window size,
* if the oldest sample started before the window while overlapping it.
*/
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat {

public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<>(2);
this.samples = new ArrayList<>(3);
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long timeMs) {
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
sample.lastEventMs = timeMs;
}

private Sample advance(MetricConfig config, long timeMs) {
this.current = (this.current + 1) % config.samples();
// need to keep one extra sample (see purgeObsoleteSamples() logic)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
int maxSamples = config.samples() + 1;
this.current = (this.current + 1) % maxSamples;
if (this.current >= samples.size()) {
Sample sample = newSample(timeMs);
this.samples.add(sample);
Expand Down Expand Up @@ -87,7 +90,7 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
if (curr.lastWindowMs < oldest.lastWindowMs)
if (curr.startTimeMs < oldest.startTimeMs)
oldest = curr;
}
return oldest;
Expand All @@ -106,44 +109,51 @@ public String toString() {

public abstract double combine(List<Sample> samples, MetricConfig config, long now);

/* Timeout any windows that have expired in the absence of any events */
// purge any samples that lack observed events within the monitored window
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
if (now - sample.lastWindowMs >= expireAge)
// samples overlapping the monitored window are kept,
// even if they started before it
if (now - sample.lastEventMs >= expireAge) {
sample.reset(now);
}
}
}

protected static class Sample {
public double initialValue;
public long eventCount;
public long lastWindowMs;
public long startTimeMs;
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
public long lastEventMs;
public double value;

public Sample(double initialValue, long now) {
this.initialValue = initialValue;
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public void reset(long now) {
this.eventCount = 0;
this.lastWindowMs = now;
this.startTimeMs = now;
this.lastEventMs = now;
this.value = initialValue;
}

public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString() {
return "Sample(" +
"value=" + value +
", eventCount=" + eventCount +
", lastWindowMs=" + lastWindowMs +
", startTimeMs=" + startTimeMs +
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
", lastEventMs=" + lastEventMs +
", initialValue=" + initialValue +
')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SimpleRate extends Rate {
@Override
public long windowSize(MetricConfig config, long now) {
stat.purgeObsoleteSamples(config, now);
long elapsed = now - stat.oldest(now).lastWindowMs;
long elapsed = now - stat.oldest(now).startTimeMs;
return Math.max(elapsed, config.timeWindowMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RateTest {
private static final double EPS = 0.000001;
private Rate r;
private Time timeClock;
private Rate rate;
private Time time;

@BeforeEach
public void setup() {
r = new Rate();
timeClock = new MockTime();
rate = new Rate();
time = new MockTime();
}

// Tests the scenario where the recording and measurement is done before the window for first sample finishes
Expand All @@ -48,12 +53,12 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS
final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
final double sampleValue = 50.0;
// record at beginning of the window
r.record(config, sampleValue, timeClock.milliseconds());
rate.record(config, sampleValue, time.milliseconds());
// forward time till almost the end of window
final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
timeClock.sleep(measurementTime);
time.sleep(measurementTime);
// calculate rate at almost the end of window
final double observedRate = r.measure(config, timeClock.milliseconds());
final double observedRate = rate.measure(config, time.milliseconds());
assertFalse(Double.isNaN(observedRate));

// In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
Expand All @@ -64,4 +69,31 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS
double expectedRatePerSec = sampleValue / windowSize;
assertEquals(expectedRatePerSec, observedRate, EPS);
}

// Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained
// window control. The expected rate, hence, is 10-11 events/sec depending on the moment of
// measurement. Start assertions from the second window. This test is to address past issue,
// when measurements in the end of the sample led to value spikes.
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testRateIsConsistentAfterTheFirstWindow() {
MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2);
List<Integer> steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100);

// start the first window and record events at 0,99,199,...,999 ms
for (int stepMs : steps) {
time.sleep(stepMs);
rate.record(config, 1, time.milliseconds());
}

// making a gap of 100 ms between windows
time.sleep(101);

// start the second window and record events at 0,99,199,...,999 ms
for (int stepMs : steps) {
time.sleep(stepMs);
rate.record(config, 1, time.milliseconds());
double observedRate = rate.measure(config, time.milliseconds());
emitskevich-blp marked this conversation as resolved.
Show resolved Hide resolved
assertTrue(10 <= observedRate && observedRate <= 11);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.List;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;

class SampledStatTest {

private SampledStat stat;
private Time time;

@BeforeEach
public void setup() {
stat = new SampleCount(0);
time = new MockTime();
}

@Test
@DisplayName("Sample should be purged if doesn't overlap the window")
public void testSampleIsPurgedIfDoesntOverlap() {
MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2);

// Monitored window: 2s. Complete a sample and wait 2.5s after.
completeSample(config);
time.sleep(2500);

double numSamples = stat.measure(config, time.milliseconds());
assertEquals(0, numSamples);
}

@Test
@DisplayName("Sample should be kept if overlaps the window")
public void testSampleIsKeptIfOverlaps() {
MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2);

// Monitored window: 2s. Complete a sample and wait 1.5s after.
completeSample(config);
time.sleep(1500);

double numSamples = stat.measure(config, time.milliseconds());
assertEquals(1, numSamples);
}

@Test
@DisplayName("Sample should be kept if overlaps the window and is n+1")
public void testSampleIsKeptIfOverlapsAndExtra() {
MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2);

// Monitored window: 2s. Create 2 samples with gaps in between and
// take a measurement at 2.2s from the start.
completeSample(config);
time.sleep(100);
completeSample(config);
time.sleep(100);
stat.record(config, 1, time.milliseconds());

double numSamples = stat.measure(config, time.milliseconds());
assertEquals(3, numSamples);
}

// Creates a sample with events at the start and at the end. Positions clock at the end.
private void completeSample(MetricConfig config) {
stat.record(config, 1, time.milliseconds());
time.sleep(config.timeWindowMs() - 1);
stat.record(config, 1, time.milliseconds());
time.sleep(1);
}

// measure() of this impl returns the number of samples
static class SampleCount extends SampledStat {

SampleCount(double initialValue) {
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
super(initialValue);
}

@Override
protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
sample.value = 1;
}

@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
return samples.stream().mapToDouble(s -> s.value).sum();
}
}
}