Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added initial metrics for synthetic source #106732

Merged
merged 22 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util;

import java.util.function.Supplier;

/**
* Stores value and provides optional functionality to set up a per-thread override value.
* Intended usage is a singleton value that is commonly accessed from multiple places.
* Having it as singleton allows to not pass instance down to every consumer.
* Thread-local override allows to use different value in tests even though it is a singleton.
* Inspired by <a href="https://docs.rs/tracing/latest/tracing/dispatcher/index.html">tracing</a>.
*/
public class ValueWithThreadLocalOverride<T> {
// Intentionally not static - different values should allow different overrides.
private final ThreadLocal<T> threadLocal = ThreadLocal.withInitial(() -> null);
private Supplier<T> supplier;

public ValueWithThreadLocalOverride(T value) {
this.supplier = () -> value;
}

/**
* returns stored value or an override if set
* @return T
*/
public T get() {
return supplier.get();
}

/**
* Installs a thread-local override value.
* @param value
* @return an {@link AutoCloseable} that removes the override.
*/
public AutoCloseable withOverride(T value) {
threadLocal.set(value);
// This is a small optimization to eliminate thread local lookup
// if override was never set, which is most of the time.
T original = supplier.get();
this.supplier = () -> getWithOverride(original);

return new Reset(threadLocal);
}

private T getWithOverride(T original) {
var local = threadLocal.get();
if (local != null) {
return local;
}

return original;
}

private record Reset(ThreadLocal<?> threadLocal) implements AutoCloseable {
@Override
public void close() throws Exception {
threadLocal.remove();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.function.LongSupplier;

/**
* Contains metrics for operations involving source field.
*/
public class SourceFieldMetrics {
public static final SourceFieldMetrics NOOP = new SourceFieldMetrics(MeterRegistry.NOOP, () -> 0);
lkts marked this conversation as resolved.
Show resolved Hide resolved

public static final String SYNTHETIC_SOURCE_LOAD_LATENCY = "es.mapper.synthetic_source.load.latency.histogram";

private final LongSupplier relativeTimeSupplier;

private final LongHistogram syntheticSourceLoadLatency;

public SourceFieldMetrics(MeterRegistry meterRegistry, LongSupplier relativeTimeSupplier) {
this.syntheticSourceLoadLatency = meterRegistry.registerLongHistogram(
SYNTHETIC_SOURCE_LOAD_LATENCY,
"Time it takes to load fields and construct synthetic source",
"ms"
);
this.relativeTimeSupplier = relativeTimeSupplier;
}

public LongSupplier getRelativeTimeSupplier() {
return relativeTimeSupplier;
}

public void recordSyntheticSourceLoadLatency(TimeValue value) {
this.syntheticSourceLoadLatency.record(value.millis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.lucene.index.LeafReader;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.indices.MapperMetrics;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -104,7 +106,23 @@ public Set<String> requiredStoredFields() {
@Override
public Leaf leaf(LeafReader reader, int[] docIdsInLeaf) throws IOException {
SyntheticFieldLoader loader = syntheticFieldLoaderLeafSupplier.get();
return new SyntheticLeaf(loader, loader.docValuesLoader(reader, docIdsInLeaf));
return new LeafWithMetrics(new SyntheticLeaf(loader, loader.docValuesLoader(reader, docIdsInLeaf)));
}

private record LeafWithMetrics(Leaf leaf) implements Leaf {

@Override
public Source source(LeafStoredFieldLoader storedFields, int docId) throws IOException {
SourceFieldMetrics metrics = MapperMetrics.SOURCE_FIELD_METRICS.get();
long startTime = metrics.getRelativeTimeSupplier().getAsLong();

var source = leaf.source(storedFields, docId);

TimeValue duration = TimeValue.timeValueMillis(metrics.getRelativeTimeSupplier().getAsLong() - startTime);
kkrik-es marked this conversation as resolved.
Show resolved Hide resolved
metrics.recordSyntheticSourceLoadLatency(duration);

return source;
}
}

private static class SyntheticLeaf implements Leaf {
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/MapperMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices;

import org.elasticsearch.common.util.ValueWithThreadLocalOverride;
import org.elasticsearch.index.mapper.SourceFieldMetrics;

/**
* Groups together all metrics used in mappers.
* Main purpose of this class is to avoid verbosity of passing individual metric instances around.
*/
public class MapperMetrics {
public static ValueWithThreadLocalOverride<SourceFieldMetrics> SOURCE_FIELD_METRICS = new ValueWithThreadLocalOverride<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the use of ValueWithThreadLocalOverride isn't needed? I think ThreadLocal.withInitial(...) can be used here that reads from a static field. Then the test just changes the NOOP static field to point to another instance. Additionally the init method can be changed to update NOOP instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a test changes static field it potentially impacts other concurrent test using this static field. That is the main idea behind ValueWithThreadLocalOverride.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOOP here is indeed a leftover from previous implementation.

SourceFieldMetrics.NOOP
);

public static void init(SourceFieldMetrics sourceFieldMetrics) {
SOURCE_FIELD_METRICS = new ValueWithThreadLocalOverride<>(sourceFieldMetrics);
}

private MapperMetrics() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesServiceBuilder;
import org.elasticsearch.indices.MapperMetrics;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.indices.SystemIndexMappingUpdateService;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -717,6 +719,12 @@ private void construct(
);
}

SourceFieldMetrics sourceFieldMetrics = new SourceFieldMetrics(
telemetryProvider.getMeterRegistry(),
threadPool::relativeTimeInMillis
);
MapperMetrics.init(sourceFieldMetrics);

IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
.pluginsService(pluginsService)
.nodeEnvironment(nodeEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@

package org.elasticsearch.index.mapper;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.MapperMetrics;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.function.LongSupplier;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -131,4 +135,34 @@ public void testHideTheCopyTo() {
})));
assertThat(e.getMessage(), equalTo("[copy_to] may not be used to copy from a multi-field: [foo.hidden]"));
}

public void testSyntheticSourceMetrics() throws Exception {
var testTelemetry = new TestTelemetryPlugin();
var sourceFieldMetrics = new SourceFieldMetrics(
testTelemetry.getTelemetryProvider(Settings.EMPTY).getMeterRegistry(),
new LongSupplier() {
private long value = 1;

@Override
public long getAsLong() {
return value++;
}
}
);

DocumentMapper mapper = createDocumentMapper(
syntheticSourceMapping(b -> { b.startObject("kwd").field("type", "keyword").endObject(); })
);
try (var ignored = MapperMetrics.SOURCE_FIELD_METRICS.withOverride(sourceFieldMetrics)) {
assertThat(syntheticSource(mapper, b -> b.field("kwd", "foo")), equalTo("""
{"kwd":"foo"}"""));

var measurements = testTelemetry.getLongHistogramMeasurement(SourceFieldMetrics.SYNTHETIC_SOURCE_LOAD_LATENCY);
// `syntheticSource` above actually performs two loads of source to perform the assertion
assertEquals(2, measurements.size());
// test implementation of time provider always has a gap of 1 between values
assertEquals(measurements.get(0).getLong(), 1);
assertEquals(measurements.get(1).getLong(), 1);
}
}
}