Skip to content

Commit

Permalink
In IngestMetrics, track ingestTime using nanos rather millis (#64182)
Browse files Browse the repository at this point in the history
This allows us to do a single loss-y nanos to millis conversion when
calculating the IndexStats, rather than tracking a running total of
loss-y metrics (since many processes will complete in less than a
millisecond).
  • Loading branch information
joegallo authored Oct 29, 2020
1 parent 29a66a7 commit df89c27
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -141,8 +140,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);

if (e != null) {
metric.ingestFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -103,8 +102,8 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.postIngest(ingestTimeInMillis);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
Expand Down
16 changes: 9 additions & 7 deletions server/src/main/java/org/elasticsearch/ingest/IngestMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.elasticsearch.ingest;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -36,7 +36,7 @@ class IngestMetric {
/**
* The time it takes to complete the measured item.
*/
private final MeanMetric ingestTime = new MeanMetric();
private final CounterMetric ingestTimeInNanos = new CounterMetric();
/**
* The current count of things being measure. Should most likely ever be 0 or 1.
* Useful when aggregating multiple metrics to see how many things are in flight.
Expand All @@ -60,11 +60,11 @@ void preIngest() {

/**
* Call this after the performing the ingest action, even if the action failed.
* @param ingestTimeInMillis The time it took to perform the action.
* @param ingestTimeInNanos The time it took to perform the action.
*/
void postIngest(long ingestTimeInMillis) {
void postIngest(long ingestTimeInNanos) {
ingestCurrent.decrementAndGet();
ingestTime.inc(ingestTimeInMillis);
this.ingestTimeInNanos.inc(ingestTimeInNanos);
ingestCount.inc();
}

Expand All @@ -84,14 +84,16 @@ void ingestFailed() {
*/
void add(IngestMetric metrics) {
ingestCount.inc(metrics.ingestCount.count());
ingestTime.inc(metrics.ingestTime.sum());
ingestTimeInNanos.inc(metrics.ingestTimeInNanos.count());
ingestFailed.inc(metrics.ingestFailed.count());
}

/**
* Creates a serializable representation for these metrics.
*/
IngestStats.Stats createStats() {
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count());
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -654,8 +653,8 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap);
ingestDocument.executePipeline(pipeline, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalMetrics.postIngest(ingestTimeInMillis);
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(e);
Expand Down
5 changes: 2 additions & 3 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -98,8 +97,8 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metrics.preIngest();
compoundProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metrics.postIngest(ingestTimeInNanos);
if (e != null) {
metrics.ingestFailed();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class IngestMetricTests extends ESTestCase {

public void testIngestCurrent() {
IngestMetric metric = new IngestMetric();
metric.preIngest();
assertThat(1L, equalTo(metric.createStats().getIngestCurrent()));
metric.postIngest(0);
assertThat(0L, equalTo(metric.createStats().getIngestCurrent()));
}

public void testIngestTimeInNanos() {
IngestMetric metric = new IngestMetric();
metric.preIngest();
metric.postIngest(500000L);
metric.preIngest();
metric.postIngest(500000L);
assertThat(1L, equalTo(metric.createStats().getIngestTimeInMillis()));
}

}

0 comments on commit df89c27

Please sign in to comment.