Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic thread safety for ValuesSourceRegistry #50340

20 changes: 20 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
import org.elasticsearch.search.rescore.Rescorer;
Expand All @@ -54,6 +55,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -250,6 +252,7 @@ public QuerySpec(String name, Writeable.Reader<T> reader, QueryParser<T> parser)
*/
class AggregationSpec extends SearchExtensionSpec<AggregationBuilder, Aggregator.Parser> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
private Consumer<ValuesSourceRegistry> aggregatorRegistrar;

/**
* Specification for an {@link Aggregation}.
Expand Down Expand Up @@ -300,6 +303,23 @@ public AggregationSpec addResultReader(String writeableName, Writeable.Reader<?
public Map<String, Writeable.Reader<? extends InternalAggregation>> getResultReaders() {
return resultReaders;
}

/**
* Get the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for
* this aggregation
*/
public Consumer<ValuesSourceRegistry> getAggregatorRegistrar() {
return aggregatorRegistrar;
}

/**
* Set the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for
* this aggregation
*/
public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> aggregatorRegistrar) {
this.aggregatorRegistrar = aggregatorRegistrar;
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
Expand Down Expand Up @@ -400,7 +401,8 @@ private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(IpRangeAggregationBuilder.NAME, IpRangeAggregationBuilder::new,
IpRangeAggregationBuilder::parse).addResultReader(InternalBinaryRange::new));
registerAggregation(new AggregationSpec(HistogramAggregationBuilder.NAME, HistogramAggregationBuilder::new,
HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new));
HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new)
.setAggregatorRegistrar(HistogramAggregationBuilder::registerAggregators));
registerAggregation(new AggregationSpec(DateHistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder::new,
DateHistogramAggregationBuilder::parse).addResultReader(InternalDateHistogram::new));
registerAggregation(new AggregationSpec(AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new,
Expand Down Expand Up @@ -440,6 +442,10 @@ private void registerAggregation(AggregationSpec spec) {
Writeable.Reader<? extends InternalAggregation> internalReader = t.getValue();
namedWriteables.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, writeableName, internalReader));
}
Consumer<ValuesSourceRegistry> register = spec.getAggregatorRegistrar();
if (register != null) {
register.accept(ValuesSourceRegistry.getInstance());
}
}

private void registerPipelineAggregations(List<SearchPlugin> plugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A builder for histograms on numeric fields. This builder can operate on either base numeric fields, or numeric range fields. IP range
Expand Down Expand Up @@ -88,6 +90,13 @@ public static HistogramAggregationBuilder parse(String aggregationName, XContent
return PARSER.parse(parser, new HistogramAggregationBuilder(aggregationName), null);
}

private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
if (wasRegistered.compareAndSet(false, true) == true) {
HistogramAggregatorFactory.registerAggregators(valuesSourceRegistry);
}
}

private double interval;
private double offset = 0;
private double minBound = Double.POSITIVE_INFINITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public final class HistogramAggregatorFactory extends ValuesSourceAggregatorFact
private final double minBound, maxBound;

// TODO: Registration should happen on the actual aggregator classes, but I don't want to set up the whole dynamic loading thing yet
static {
ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE,
static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE,
new HistogramAggregatorSupplier() {
@Override
public Aggregator build(String name, AggregatorFactories factories, double interval, double offset,
Expand All @@ -76,7 +76,7 @@ public Aggregator build(String name, AggregatorFactories factories, double inter
(fieldType, indexFieldData) -> fieldType instanceof RangeFieldMapper.RangeFieldType
);

ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC,
valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC,
new HistogramAggregatorSupplier() {
@Override
public Aggregator build(String name, AggregatorFactories factories, double interval, double offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import org.elasticsearch.search.aggregations.AggregationExecutionException;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.BiFunction;

/*
Expand All @@ -39,49 +38,76 @@
*/
public enum ValuesSourceRegistry {
INSTANCE {
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = new HashMap<>();
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = Map.of();
// We use a List of Entries here to approximate an ordered map
Map<String, List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>>> resolverRegistry
= new HashMap<>();
= Map.of();

/**
* Threading behavior notes: This call is both synchronized and expensive. It copies the entire existing mapping structure each
* time it is invoked. We expect that register will be called a small number of times during startup only (as plugins are being
* registered) and we can tolerate the cost at that time. Once all plugins are registered, we should never need to call register
* again. Comparatively, we expect to do many reads from the registry data structures, and those reads may be interleaved on
* different worker threads. Thus we want to optimize the read case to be thread safe and fast, which the immutable
* collections do well. Using immutable collections requires a copy on write mechanic, thus the somewhat non-intuitive
* implementation of this method.
*
* @param aggregationName The name of the family of aggregations, typically found via ValuesSourceAggregationBuilder.getType()
* @param valuesSourceType The ValuesSourceType this mapping applies to.
* @param aggregatorSupplier An Aggregation-specific specialization of AggregatorSupplier which will construct the mapped aggregator
* from the aggregation standard set of parameters
* @param resolveValuesSourceType A predicate operating on MappedFieldType and IndexFieldData instances which decides if the mapped
*/
@Override
public void register(String aggregationName, ValuesSourceType valuesSourceType,AggregatorSupplier aggregatorSupplier,
public synchronized void register(String aggregationName, ValuesSourceType valuesSourceType, AggregatorSupplier aggregatorSupplier,
BiFunction<MappedFieldType, IndexFieldData, Boolean> resolveValuesSourceType) {
if (resolverRegistry.containsKey(aggregationName) == false) {
resolverRegistry.put(aggregationName, new ArrayList<>());
// Aggregator registry block - do this first in case we need to throw on duplicate registration
Map<ValuesSourceType, AggregatorSupplier> innerMap;
if (aggregatorRegistry.containsKey(aggregationName)) {
if (aggregatorRegistry.get(aggregationName).containsKey(valuesSourceType)) {
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
+ valuesSourceType.toString() + "]");
}
innerMap = copyAndAdd(aggregatorRegistry.get(aggregationName),
new AbstractMap.SimpleEntry<>(valuesSourceType, aggregatorSupplier));
} else {
innerMap = Map.of(valuesSourceType, aggregatorSupplier);
}
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
= resolverRegistry.get(aggregationName);
resolverList.add(new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType));
aggregatorRegistry = copyAndAdd(aggregatorRegistry, new AbstractMap.SimpleEntry<>(aggregationName, innerMap));

if (aggregatorRegistry.containsKey(aggregationName) == false) {
aggregatorRegistry.put(aggregationName, new HashMap<>());
}
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
if (innerMap.containsKey(valuesSourceType)) {
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
+ valuesSourceType.toString() + "]");
// Resolver registry block
AbstractMap.SimpleEntry[] mappings;
if (resolverRegistry.containsKey(aggregationName)) {
List currentMappings = resolverRegistry.get(aggregationName);
mappings = (AbstractMap.SimpleEntry[]) currentMappings.toArray(new AbstractMap.SimpleEntry[currentMappings.size() + 1]);
} else {
mappings = new AbstractMap.SimpleEntry[1];
}
innerMap.put(valuesSourceType, aggregatorSupplier);
mappings[mappings.length - 1] = new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType);
resolverRegistry = copyAndAdd(resolverRegistry,new AbstractMap.SimpleEntry<>(aggregationName, List.of(mappings)));
}

@Override
public AggregatorSupplier getAggregator(ValuesSourceType valuesSourceType, String aggregationName) {
if (aggregatorRegistry.containsKey(aggregationName)) {
StringJoiner validSourceTypes = new StringJoiner(",", "[", "]");
if (aggregationName != null && aggregatorRegistry.containsKey(aggregationName)) {
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
if (innerMap.containsKey(valuesSourceType)) {
if (valuesSourceType != null && innerMap.containsKey(valuesSourceType)) {
return innerMap.get(valuesSourceType);
}
for (ValuesSourceType validVST : innerMap.keySet()) {
validSourceTypes.add(validVST.toString());
}
throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() +
" is not supported for aggregation" + aggregationName + ". Valid choices are " + validSourceTypes.toString());
}
// TODO: Error message should list valid ValuesSource types
throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() +
" is not supported for aggregation" + aggregationName);
throw new AggregationExecutionException("Unregistered Aggregation [" + aggregationName + "]");
}

@Override
public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFieldData indexFieldData, String aggregationName,
ValueType valueType) {
if (resolverRegistry.containsKey(aggregationName)) {
if (aggregationName != null && resolverRegistry.containsKey(aggregationName)) {
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
= resolverRegistry.get(aggregationName);
for (AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType> entry : resolverList) {
Expand All @@ -90,8 +116,9 @@ public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFiel
return entry.getValue();
}
}
// TODO: Error message should list valid field types; not sure fieldType.toString() is the best choice.
throw new IllegalArgumentException("Field type " + fieldType.toString() + " is not supported for aggregation "
// TODO: Error message should list valid field types
String fieldDescription = fieldType.name() + "(" + fieldType.toString() + ")";
throw new IllegalArgumentException("Field type " + fieldDescription + " is not supported for aggregation "
+ aggregationName);
} else {
// TODO: Legacy resolve logic; remove this after converting all aggregations to the new system
Expand Down Expand Up @@ -133,4 +160,26 @@ public abstract ValuesSourceType getValuesSourceType(MappedFieldType fieldType,
ValueType valueType);

public static ValuesSourceRegistry getInstance() {return INSTANCE;}

private static <K, V> Map copyAndAdd(Map<K, V> source, Map.Entry<K, V> newValue) {
Map.Entry[] entries;
if (source.containsKey(newValue.getKey())) {
// Replace with new value
entries = new Map.Entry[source.size()];
int i = 0;
for (Map.Entry entry : source.entrySet()) {
if (entry.getKey() == newValue.getKey()) {
entries[i] = newValue;
} else {
entries[i] = entry;
}
i++;
}
} else {
entries = source.entrySet().toArray(new Map.Entry[source.size() + 1]);
entries[entries.length - 1] = newValue;
}
return Map.ofEntries(entries);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.fetch.FetchPhase;
Expand All @@ -76,6 +77,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.junit.After;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -123,8 +125,11 @@ private static void registerFieldTypes(SearchContext searchContext, MapperServic
when(mapperService.fullName(fieldName)).thenReturn(fieldType);
when(searchContext.smartNameFieldType(fieldName)).thenReturn(fieldType);
}
}


@BeforeClass
public static void initValuesSourceRegistry() {
new SearchModule(Settings.EMPTY, List.of());
}

protected <A extends Aggregator> A createAggregator(AggregationBuilder aggregationBuilder,
Expand Down