diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3bd90f67c0e61..ba10af28c8a1a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -141,8 +140,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume final long startTimeInNanos = relativeTimeProvider.getAsLong(); metric.preIngest(); processor.execute(ingestDocument, (result, e) -> { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); if (e != null) { metric.ingestFailed(); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 620737c36d4eb..0bc0dcfb9afdd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -36,7 +36,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -103,8 +102,8 @@ public void execute(IngestDocument ingestDocument, BiConsumer { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); if (e != null) { metric.ingestFailed(); handler.accept(null, e); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java index bbfd6e21c3b22..caf88b2c27439 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java @@ -20,8 +20,8 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -36,7 +36,7 @@ class IngestMetric { /** * The time it takes to complete the measured item. */ - private final MeanMetric ingestTime = new MeanMetric(); + private final CounterMetric ingestTimeInNanos = new CounterMetric(); /** * The current count of things being measure. Should most likely ever be 0 or 1. * Useful when aggregating multiple metrics to see how many things are in flight. @@ -60,11 +60,11 @@ void preIngest() { /** * Call this after the performing the ingest action, even if the action failed. - * @param ingestTimeInMillis The time it took to perform the action. + * @param ingestTimeInNanos The time it took to perform the action. */ - void postIngest(long ingestTimeInMillis) { + void postIngest(long ingestTimeInNanos) { ingestCurrent.decrementAndGet(); - ingestTime.inc(ingestTimeInMillis); + this.ingestTimeInNanos.inc(ingestTimeInNanos); ingestCount.inc(); } @@ -84,7 +84,7 @@ void ingestFailed() { */ void add(IngestMetric metrics) { ingestCount.inc(metrics.ingestCount.count()); - ingestTime.inc(metrics.ingestTime.sum()); + ingestTimeInNanos.inc(metrics.ingestTimeInNanos.count()); ingestFailed.inc(metrics.ingestFailed.count()); } @@ -92,6 +92,8 @@ void add(IngestMetric metrics) { * Creates a serializable representation for these metrics. */ IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count()); + // we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count()); + return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count()); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3816d8c60d0d4..1cb4e7e866e74 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -70,7 +70,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -654,8 +653,8 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap); ingestDocument.executePipeline(pipeline, (result, e) -> { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalMetrics.postIngest(ingestTimeInMillis); + long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); if (e != null) { totalMetrics.ingestFailed(); handler.accept(e); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 3d41d991f3e10..ef385aab90751 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -26,7 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.LongSupplier; @@ -98,8 +97,8 @@ public void execute(IngestDocument ingestDocument, BiConsumer { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metrics.postIngest(ingestTimeInMillis); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metrics.postIngest(ingestTimeInNanos); if (e != null) { metrics.ingestFailed(); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java new file mode 100644 index 0000000000000..9a1d1ae017b36 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class IngestMetricTests extends ESTestCase { + + public void testIngestCurrent() { + IngestMetric metric = new IngestMetric(); + metric.preIngest(); + assertThat(1L, equalTo(metric.createStats().getIngestCurrent())); + metric.postIngest(0); + assertThat(0L, equalTo(metric.createStats().getIngestCurrent())); + } + + public void testIngestTimeInNanos() { + IngestMetric metric = new IngestMetric(); + metric.preIngest(); + metric.postIngest(500000L); + metric.preIngest(); + metric.postIngest(500000L); + assertThat(1L, equalTo(metric.createStats().getIngestTimeInMillis())); + } + +}