diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java index 7367e966c014e..a77cc9309bd20 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java @@ -25,6 +25,8 @@ */ public class MetricConfig { + public static final int DEFAULT_NUM_SAMPLES = 2; + private Quota quota; private int samples; private long eventWindow; @@ -34,7 +36,7 @@ public class MetricConfig { public MetricConfig() { this.quota = null; - this.samples = 2; + this.samples = DEFAULT_NUM_SAMPLES; this.eventWindow = Long.MAX_VALUE; this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.tags = new LinkedHashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 09b7c05c8f283..4f15bb9607e1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -74,15 +74,18 @@ public long windowSize(MetricConfig config, long now) { /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second + * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. * * Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time, * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. + * + * Note also, that totalElapsedTimeMs can be larger than the monitored window size, + * if the oldest sample started before the window while overlapping it. */ - long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; + long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs; // Check how many full windows of data we have currently retained int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs()); int minFullWindows = config.samples() - 1; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index df4a95ccaa9b6..f76fccc853bfa 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -40,7 +40,8 @@ public abstract class SampledStat implements MeasurableStat { public SampledStat(double initialValue) { this.initialValue = initialValue; - this.samples = new ArrayList<>(2); + // keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic) + this.samples = new ArrayList<>(MetricConfig.DEFAULT_NUM_SAMPLES + 1); } @Override @@ -50,10 +51,13 @@ public void record(MetricConfig config, double value, long timeMs) { sample = advance(config, timeMs); update(sample, config, value, timeMs); sample.eventCount += 1; + sample.lastEventMs = timeMs; } private Sample advance(MetricConfig config, long timeMs) { - this.current = (this.current + 1) % config.samples(); + // keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic) + int maxSamples = config.samples() + 1; + this.current = (this.current + 1) % maxSamples; if (this.current >= samples.size()) { Sample sample = newSample(timeMs); this.samples.add(sample); @@ -87,7 +91,7 @@ public Sample oldest(long now) { Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); - if (curr.lastWindowMs < oldest.lastWindowMs) + if (curr.startTimeMs < oldest.startTimeMs) oldest = curr; } return oldest; @@ -106,36 +110,42 @@ public String toString() { public abstract double combine(List samples, MetricConfig config, long now); - /* Timeout any windows that have expired in the absence of any events */ + // purge any samples that lack observed events within the monitored window protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { - if (now - sample.lastWindowMs >= expireAge) + // samples overlapping the monitored window are kept, + // even if they started before it + if (now - sample.lastEventMs >= expireAge) { sample.reset(now); + } } } protected static class Sample { public double initialValue; public long eventCount; - public long lastWindowMs; + public long startTimeMs; + public long lastEventMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; - this.lastWindowMs = now; + this.startTimeMs = now; + this.lastEventMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; - this.lastWindowMs = now; + this.startTimeMs = now; + this.lastEventMs = now; this.value = initialValue; } public boolean isComplete(long timeMs, MetricConfig config) { - return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); + return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } @Override @@ -143,7 +153,8 @@ public String toString() { return "Sample(" + "value=" + value + ", eventCount=" + eventCount + - ", lastWindowMs=" + lastWindowMs + + ", startTimeMs=" + startTimeMs + + ", lastEventMs=" + lastEventMs + ", initialValue=" + initialValue + ')'; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java index 931bd9c35e51f..a632f0254d6b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java @@ -33,7 +33,7 @@ public class SimpleRate extends Rate { @Override public long windowSize(MetricConfig config, long now) { stat.purgeObsoleteSamples(config, now); - long elapsed = now - stat.oldest(now).lastWindowMs; + long elapsed = now - stat.oldest(now).startTimeMs; return Math.max(elapsed, config.timeWindowMs()); } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index f514281abf314..a4289d8afbd23 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -336,17 +336,6 @@ public void testRemoveMetric() { assertEquals(size, metrics.metrics().size()); } - @Test - public void testEventWindowing() { - WindowedCount count = new WindowedCount(); - MetricConfig config = new MetricConfig().eventWindow(1).samples(2); - count.record(config, 1.0, time.milliseconds()); - count.record(config, 1.0, time.milliseconds()); - assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); - count.record(config, 1.0, time.milliseconds()); // first event times out - assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); - } - @Test public void testTimeWindowing() { WindowedCount count = new WindowedCount(); @@ -475,28 +464,13 @@ public void testPercentiles() { Metric p50 = this.metrics.metrics().get(metrics.metricName("test.p50", "grp1")); Metric p75 = this.metrics.metrics().get(metrics.metricName("test.p75", "grp1")); - // record two windows worth of sequential values - for (int i = 0; i < buckets; i++) - sensor.record(i); - - assertEquals(25, (Double) p25.metricValue(), 1.0); - assertEquals(50, (Double) p50.metricValue(), 1.0); - assertEquals(75, (Double) p75.metricValue(), 1.0); - - for (int i = 0; i < buckets; i++) - sensor.record(0.0); - - assertEquals(0.0, (Double) p25.metricValue(), 1.0); - assertEquals(0.0, (Double) p50.metricValue(), 1.0); - assertEquals(0.0, (Double) p75.metricValue(), 1.0); - - // record two more windows worth of sequential values + // record 100 sequential values for (int i = 0; i < buckets; i++) sensor.record(i); - assertEquals(25, (Double) p25.metricValue(), 1.0); - assertEquals(50, (Double) p50.metricValue(), 1.0); - assertEquals(75, (Double) p75.metricValue(), 1.0); + assertEquals(25, (Double) p25.metricValue()); + assertEquals(50, (Double) p50.metricValue()); + assertEquals(75, (Double) p75.metricValue()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java index 344ade22a66ed..ba3067d993d9f 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/FrequenciesTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.metrics.stats; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable; import org.apache.kafka.common.metrics.JmxReporter; @@ -73,14 +72,14 @@ public void testMoreFrequencyParametersThanBuckets() { } @Test - public void testBooleanFrequencies() { - MetricName metricTrue = name("true"); - MetricName metricFalse = name("false"); + public void testBooleanFrequenciesStrategy1() { + MetricName metricTrue = metricName("true"); + MetricName metricFalse = metricName("false"); Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue); final NamedMeasurable falseMetric = frequencies.stats().get(0); final NamedMeasurable trueMetric = frequencies.stats().get(1); - // Record 2 windows worth of values + // Record 25 "false" and 75 "true" for (int i = 0; i != 25; ++i) { frequencies.record(config, 0.0, time.milliseconds()); } @@ -89,8 +88,17 @@ public void testBooleanFrequencies() { } assertEquals(0.25, falseMetric.stat().measure(config, time.milliseconds()), DELTA); assertEquals(0.75, trueMetric.stat().measure(config, time.milliseconds()), DELTA); + } + + @Test + public void testBooleanFrequenciesStrategy2() { + MetricName metricTrue = metricName("true"); + MetricName metricFalse = metricName("false"); + Frequencies frequencies = Frequencies.forBooleanValues(metricFalse, metricTrue); + final NamedMeasurable falseMetric = frequencies.stats().get(0); + final NamedMeasurable trueMetric = frequencies.stats().get(1); - // Record 2 more windows worth of values + // Record 40 "false" and 60 "true" for (int i = 0; i != 40; ++i) { frequencies.record(config, 0.0, time.milliseconds()); } @@ -102,58 +110,69 @@ public void testBooleanFrequencies() { } @Test - public void testUseWithMetrics() { - MetricName name1 = name("1"); - MetricName name2 = name("2"); - MetricName name3 = name("3"); - MetricName name4 = name("4"); - Frequencies frequencies = new Frequencies(4, 1.0, 4.0, - new Frequency(name1, 1.0), - new Frequency(name2, 2.0), - new Frequency(name3, 3.0), - new Frequency(name4, 4.0)); + public void testWithMetricsStrategy1() { + Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0), + freq("2", 2.0), freq("3", 3.0), freq("4", 4.0)); Sensor sensor = metrics.sensor("test", config); sensor.add(frequencies); - Metric metric1 = this.metrics.metrics().get(name1); - Metric metric2 = this.metrics.metrics().get(name2); - Metric metric3 = this.metrics.metrics().get(name3); - Metric metric4 = this.metrics.metrics().get(name4); - // Record 2 windows worth of values - for (int i = 0; i != 100; ++i) { + // Record 100 events uniformly between all buckets + for (int i = 0; i < 100; ++i) { frequencies.record(config, i % 4 + 1, time.milliseconds()); } - assertEquals(0.25, (Double) metric1.metricValue(), DELTA); - assertEquals(0.25, (Double) metric2.metricValue(), DELTA); - assertEquals(0.25, (Double) metric3.metricValue(), DELTA); - assertEquals(0.25, (Double) metric4.metricValue(), DELTA); + assertEquals(0.25, metricValue("1"), DELTA); + assertEquals(0.25, metricValue("2"), DELTA); + assertEquals(0.25, metricValue("3"), DELTA); + assertEquals(0.25, metricValue("4"), DELTA); + } + + @Test + public void testWithMetricsStrategy2() { + Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0), + freq("2", 2.0), freq("3", 3.0), freq("4", 4.0)); + Sensor sensor = metrics.sensor("test", config); + sensor.add(frequencies); + + // Record 100 events half-half between 1st and 2nd buckets + for (int i = 0; i < 100; ++i) { + frequencies.record(config, i % 2 + 1, time.milliseconds()); + } + assertEquals(0.50, metricValue("1"), DELTA); + assertEquals(0.50, metricValue("2"), DELTA); + assertEquals(0.00, metricValue("3"), DELTA); + assertEquals(0.00, metricValue("4"), DELTA); + } - // Record 2 windows worth of values - for (int i = 0; i != 100; ++i) { + @Test + public void testWithMetricsStrategy3() { + Frequencies frequencies = new Frequencies(4, 1.0, 4.0, freq("1", 1.0), + freq("2", 2.0), freq("3", 3.0), freq("4", 4.0)); + Sensor sensor = metrics.sensor("test", config); + sensor.add(frequencies); + + // Record 50 events half-half between 1st and 2nd buckets + for (int i = 0; i < 50; ++i) { frequencies.record(config, i % 2 + 1, time.milliseconds()); } - assertEquals(0.50, (Double) metric1.metricValue(), DELTA); - assertEquals(0.50, (Double) metric2.metricValue(), DELTA); - assertEquals(0.00, (Double) metric3.metricValue(), DELTA); - assertEquals(0.00, (Double) metric4.metricValue(), DELTA); - - // Record 1 window worth of values to overlap with the last window - // that is half 1.0 and half 2.0 - for (int i = 0; i != 50; ++i) { + // Record 50 events to 4th bucket + for (int i = 0; i < 50; ++i) { frequencies.record(config, 4.0, time.milliseconds()); } - assertEquals(0.25, (Double) metric1.metricValue(), DELTA); - assertEquals(0.25, (Double) metric2.metricValue(), DELTA); - assertEquals(0.00, (Double) metric3.metricValue(), DELTA); - assertEquals(0.50, (Double) metric4.metricValue(), DELTA); + assertEquals(0.25, metricValue("1"), DELTA); + assertEquals(0.25, metricValue("2"), DELTA); + assertEquals(0.00, metricValue("3"), DELTA); + assertEquals(0.50, metricValue("4"), DELTA); } - protected MetricName name(String metricName) { - return new MetricName(metricName, "group-id", "desc", Collections.emptyMap()); + private MetricName metricName(String name) { + return new MetricName(name, "group-id", "desc", Collections.emptyMap()); } - protected Frequency freq(String name, double value) { - return new Frequency(name(name), value); + private Frequency freq(String name, double value) { + return new Frequency(metricName(name), value); } + private double metricValue(String name) { + return (double) metrics.metrics().get(metricName(name)).metricValue(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java index 04c5ca1292f98..2e8662f1f96c9 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java @@ -21,23 +21,28 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RateTest { private static final double EPS = 0.000001; - private Rate r; - private Time timeClock; + private Rate rate; + private Time time; @BeforeEach public void setup() { - r = new Rate(); - timeClock = new MockTime(); + rate = new Rate(); + time = new MockTime(); } // Tests the scenario where the recording and measurement is done before the window for first sample finishes @@ -48,12 +53,12 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS); final double sampleValue = 50.0; // record at beginning of the window - r.record(config, sampleValue, timeClock.milliseconds()); + rate.record(config, sampleValue, time.milliseconds()); // forward time till almost the end of window final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1; - timeClock.sleep(measurementTime); + time.sleep(measurementTime); // calculate rate at almost the end of window - final double observedRate = r.measure(config, timeClock.milliseconds()); + final double observedRate = rate.measure(config, time.milliseconds()); assertFalse(Double.isNaN(observedRate)); // In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes @@ -64,4 +69,34 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS double expectedRatePerSec = sampleValue / windowSize; assertEquals(expectedRatePerSec, observedRate, EPS); } + + // Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained + // window control. The expected rate, hence, is 10-11 events/sec depending on the moment of + // measurement. Start assertions from the second window. This test covers the case where a + // sample window partially overlaps with the monitored window. + @Test + public void testRateIsConsistentAfterTheFirstWindow() { + MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100); + + // start the first window and record events at 0,99,199,...,999 ms + for (int stepMs : steps) { + time.sleep(stepMs); + rate.record(config, 1, time.milliseconds()); + } + + // making a gap of 100 ms between windows + time.sleep(101); + + // start the second window and record events at 0,99,199,...,999 ms + for (int stepMs : steps) { + time.sleep(stepMs); + rate.record(config, 1, time.milliseconds()); + double observedRate = rate.measure(config, time.milliseconds()); + assertTrue(10 <= observedRate && observedRate <= 11); + // make sure measurements are repeatable with the same timestamp + double measuredAgain = rate.measure(config, time.milliseconds()); + assertEquals(observedRate, measuredAgain); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java new file mode 100644 index 0000000000000..7b45131b9b481 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SampledStatTest { + + private SampledStat stat; + private Time time; + + @BeforeEach + public void setup() { + stat = new SampleCount(); + time = new MockTime(); + } + + @Test + @DisplayName("Sample should be purged if doesn't overlap the window") + public void testSampleIsPurgedIfDoesntOverlap() { + MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + + // Monitored window: 2s. Complete a sample and wait 2.5s after. + completeSample(config); + time.sleep(2500); + + double numSamples = stat.measure(config, time.milliseconds()); + assertEquals(0, numSamples); + } + + @Test + @DisplayName("Sample should be kept if overlaps the window") + public void testSampleIsKeptIfOverlaps() { + MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + + // Monitored window: 2s. Complete a sample and wait 1.5s after. + completeSample(config); + time.sleep(1500); + + double numSamples = stat.measure(config, time.milliseconds()); + assertEquals(1, numSamples); + } + + @Test + @DisplayName("Sample should be kept if overlaps the window and is n+1") + public void testSampleIsKeptIfOverlapsAndExtra() { + MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + + // Monitored window: 2s. Create 2 samples with gaps in between and + // take a measurement at 2.2s from the start. + completeSample(config); + time.sleep(100); + completeSample(config); + time.sleep(100); + stat.record(config, 1, time.milliseconds()); + + double numSamples = stat.measure(config, time.milliseconds()); + assertEquals(3, numSamples); + } + + // Creates a sample with events at the start and at the end. Positions clock at the end. + private void completeSample(MetricConfig config) { + stat.record(config, 1, time.milliseconds()); + time.sleep(config.timeWindowMs() - 1); + stat.record(config, 1, time.milliseconds()); + time.sleep(1); + } + + // measure() of this impl returns the number of samples + static class SampleCount extends SampledStat { + + SampleCount() { + super(0); + } + + @Override + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { + sample.value = 1; + } + + @Override + public double combine(List samples, MetricConfig config, long now) { + return samples.stream().mapToDouble(s -> s.value).sum(); + } + } +}