Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk merge field-caps responses using mapping hash #86323

Merged
merged 16 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/86323.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86323
summary: Bulk merge field-caps responses using mapping hash
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand All @@ -70,6 +72,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class FieldCapabilitiesIT extends ESIntegTestCase {

Expand Down Expand Up @@ -575,6 +579,67 @@ public void testRelocation() throws Exception {
}
}

public void testManyIndicesWithSameMapping() {
final String mapping = """
{
"properties": {
"message_field": { "type": "text" },
"value_field": { "type": "long" },
"multi_field" : { "type" : "ip", "fields" : { "keyword" : { "type" : "keyword" } } },
"timestamp": {"type": "date"}
}
}
""";
String[] indices = IntStream.range(0, between(1, 9)).mapToObj(n -> "test_many_index_" + n).toArray(String[]::new);
for (String index : indices) {
assertAcked(client().admin().indices().prepareCreate(index).setMapping(mapping).get());
}
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("test_many_index_*");
request.fields("*");
boolean excludeMultiField = randomBoolean();
if (excludeMultiField) {
request.filters("-multifield");
}
Consumer<FieldCapabilitiesResponse> verifyResponse = resp -> {
assertThat(resp.getIndices(), equalTo(indices));
assertThat(resp.getField("message_field"), hasKey("text"));
assertThat(resp.getField("message_field").get("text").indices(), nullValue());
assertTrue(resp.getField("message_field").get("text").isSearchable());
assertFalse(resp.getField("message_field").get("text").isAggregatable());

assertThat(resp.getField("value_field"), hasKey("long"));
assertThat(resp.getField("value_field").get("long").indices(), nullValue());
assertTrue(resp.getField("value_field").get("long").isSearchable());
assertTrue(resp.getField("value_field").get("long").isAggregatable());

assertThat(resp.getField("timestamp"), hasKey("date"));

assertThat(resp.getField("multi_field"), hasKey("ip"));
if (excludeMultiField) {
assertThat(resp.getField("multi_field.keyword"), not(hasKey("keyword")));
} else {
assertThat(resp.getField("multi_field.keyword"), hasKey("keyword"));
}
};
// Single mapping
verifyResponse.accept(client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet());

// add an extra field for some indices
String[] indicesWithExtraField = randomSubsetOf(between(1, indices.length), indices).stream().sorted().toArray(String[]::new);
ensureGreen(indices);
assertAcked(client().admin().indices().preparePutMapping(indicesWithExtraField).setSource("extra_field", "type=integer").get());
for (String index : indicesWithExtraField) {
client().prepareIndex(index).setSource("extra_field", randomIntBetween(1, 1000)).get();
}
FieldCapabilitiesResponse resp = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
verifyResponse.accept(resp);
assertThat(resp.getField("extra_field"), hasKey("integer"));
assertThat(resp.getField("extra_field").get("integer").indices(), nullValue());
assertTrue(resp.getField("extra_field").get("integer").isSearchable());
assertTrue(resp.getField("extra_field").get("integer").isAggregatable());
}

private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
assertNotNull(response.getIndices());
Arrays.sort(indices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,106 +486,125 @@ static class Builder {
private int dimensionIndices = 0;
private TimeSeriesParams.MetricType metricType;
private boolean hasConflictMetricType;
private final List<IndexCaps> indiceList;
private final List<IndexCaps> indicesList;
private final Map<String, Set<String>> meta;
private int totalIndices;

Builder(String name, String type) {
this.name = name;
this.type = type;
this.metricType = null;
this.hasConflictMetricType = false;
this.indiceList = new ArrayList<>();
this.indicesList = new ArrayList<>();
this.meta = new HashMap<>();
}

private boolean assertIndicesSorted(String[] indices) {
for (int i = 1; i < indices.length; i++) {
assert indices[i - 1].compareTo(indices[i]) < 0 : "indices [" + Arrays.toString(indices) + "] aren't sorted";
}
if (indicesList.isEmpty() == false) {
final IndexCaps lastCaps = indicesList.get(indicesList.size() - 1);
final String lastIndex = lastCaps.indices[lastCaps.indices.length - 1];
assert lastIndex.compareTo(indices[0]) < 0
: "indices aren't sorted; previous [" + lastIndex + "], current [" + indices[0] + "]";
}
return true;
}

/**
* Collect the field capabilities for an index.
*/
void add(
String index,
String[] indices,
boolean isMetadataField,
boolean search,
boolean agg,
boolean isDimension,
TimeSeriesParams.MetricType metricType,
Map<String, String> meta
) {
assert indiceList.isEmpty() || indiceList.get(indiceList.size() - 1).name.compareTo(index) < 0
: "indices aren't sorted; previous [" + indiceList.get(indiceList.size() - 1).name + "], current [" + index + "]";
assert assertIndicesSorted(indices);
totalIndices += indices.length;
if (search) {
searchableIndices++;
searchableIndices += indices.length;
}
if (agg) {
aggregatableIndices++;
aggregatableIndices += indices.length;
}
if (isDimension) {
dimensionIndices++;
dimensionIndices += indices.length;
}
this.isMetadataField |= isMetadataField;
// If we have discrepancy in metric types or in some indices this field is not marked as a metric field - we will
// treat is a non-metric field and report this discrepancy in metricConflictsIndices
if (indiceList.isEmpty()) {
if (indicesList.isEmpty()) {
this.metricType = metricType;
} else if (this.metricType != metricType) {
hasConflictMetricType = true;
this.metricType = null;
}
IndexCaps indexCaps = new IndexCaps(index, search, agg, isDimension, metricType);
indiceList.add(indexCaps);
indicesList.add(new IndexCaps(indices, search, agg, isDimension, metricType));
for (Map.Entry<String, String> entry : meta.entrySet()) {
this.meta.computeIfAbsent(entry.getKey(), key -> new HashSet<>()).add(entry.getValue());
}
}

Stream<String> getIndices() {
return indiceList.stream().map(c -> c.name);
return indicesList.stream().flatMap(c -> Arrays.stream(c.indices));
}

private String[] getNonFeatureIndices(boolean featureInAll, int featureIndices, Predicate<IndexCaps> hasFeature) {
if (featureInAll || featureIndices == 0) {
return null;
}
String[] nonFeatureIndices = new String[indiceList.size() - featureIndices];
private String[] filterIndices(int length, Predicate<IndexCaps> pred) {
int index = 0;
for (IndexCaps indexCaps : indiceList) {
if (hasFeature.test(indexCaps) == false) {
nonFeatureIndices[index++] = indexCaps.name;
final String[] dst = new String[length];
for (IndexCaps indexCaps : indicesList) {
if (pred.test(indexCaps)) {
System.arraycopy(indexCaps.indices, 0, dst, index, indexCaps.indices.length);
index += indexCaps.indices.length;
}
}
return nonFeatureIndices;
assert index == length : index + "!=" + length;
return dst;
}

FieldCapabilities build(boolean withIndices) {
final String[] indices;
if (withIndices) {
indices = indiceList.stream().map(caps -> caps.name).toArray(String[]::new);
} else {
indices = null;
}
final String[] indices = withIndices ? filterIndices(totalIndices, ic -> true) : null;

// Iff this field is searchable in some indices AND non-searchable in others
// we record the list of non-searchable indices
boolean isSearchable = searchableIndices == indiceList.size();
String[] nonSearchableIndices = getNonFeatureIndices(isSearchable, searchableIndices, IndexCaps::isSearchable);
final boolean isSearchable = searchableIndices == totalIndices;
final String[] nonSearchableIndices;
if (isSearchable || searchableIndices == 0) {
nonSearchableIndices = null;
} else {
nonSearchableIndices = filterIndices(totalIndices - searchableIndices, ic -> ic.isSearchable == false);
}

// Iff this field is aggregatable in some indices AND non-aggregatable in others
// we keep the list of non-aggregatable indices
boolean isAggregatable = aggregatableIndices == indiceList.size();
String[] nonAggregatableIndices = getNonFeatureIndices(isAggregatable, aggregatableIndices, IndexCaps::isAggregatable);
final boolean isAggregatable = aggregatableIndices == totalIndices;
final String[] nonAggregatableIndices;
if (isAggregatable || aggregatableIndices == 0) {
nonAggregatableIndices = null;
} else {
nonAggregatableIndices = filterIndices(totalIndices - aggregatableIndices, ic -> ic.isAggregatable == false);
}

// Collect all indices that have dimension == false if this field is marked as a dimension in at least one index
boolean isDimension = dimensionIndices == indiceList.size();
String[] nonDimensionIndices = getNonFeatureIndices(isDimension, dimensionIndices, IndexCaps::isDimension);
final boolean isDimension = dimensionIndices == totalIndices;
final String[] nonDimensionIndices;
if (isDimension || dimensionIndices == 0) {
nonDimensionIndices = null;
} else {
nonDimensionIndices = filterIndices(totalIndices - dimensionIndices, ic -> ic.isDimension == false);
}

final String[] metricConflictsIndices;
if (hasConflictMetricType) {
// Collect all indices that have this field. If it is marked differently in different indices, we cannot really
// make a decisions which index is "right" and which index is "wrong" so collecting all indices where this field
// is present is probably the only sensible thing to do here
metricConflictsIndices = Objects.requireNonNullElseGet(
indices,
() -> indiceList.stream().map(caps -> caps.name).toArray(String[]::new)
);
metricConflictsIndices = Objects.requireNonNullElseGet(indices, () -> filterIndices(totalIndices, ic -> true));
} else {
metricConflictsIndices = null;
}
Expand Down Expand Up @@ -613,7 +632,7 @@ FieldCapabilities build(boolean withIndices) {
}

private record IndexCaps(
String name,
String[] indices,
boolean isSearchable,
boolean isAggregatable,
boolean isDimension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.fieldcaps;

import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
Expand Down Expand Up @@ -36,14 +37,15 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -114,12 +116,12 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti

final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
final Map<String, FieldCapabilitiesIndexResponse> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
Map<String, IndexFieldCapabilities> curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get());
FieldCapabilitiesIndexResponse curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp);
if (curr != null) {
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true);
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), curr.getIndexMappingHash(), curr.get(), true);
}
}
indexResponses.putIfAbsent(resp.getIndexName(), resp);
Expand Down Expand Up @@ -228,15 +230,35 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
return remoteRequest;
}

private static boolean hasSameMappingHash(FieldCapabilitiesIndexResponse r1, FieldCapabilitiesIndexResponse r2) {
return r1.getIndexMappingHash() != null
&& r2.getIndexMappingHash() != null
&& r1.getIndexMappingHash().equals(r2.getIndexMappingHash());
}

private FieldCapabilitiesResponse merge(
Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
FieldCapabilitiesRequest request,
List<FieldCapabilitiesFailure> failures
) {
Map<String, FieldCapabilitiesIndexResponse> responses = new TreeMap<>(indexResponsesMap);
Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
for (FieldCapabilitiesIndexResponse response : responses.values()) {
innerMerge(responseMapBuilder, request, response);
final FieldCapabilitiesIndexResponse[] indexResponses = indexResponsesMap.values()
.stream()
.sorted(Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName))
.toArray(FieldCapabilitiesIndexResponse[]::new);
final String[] indices = Arrays.stream(indexResponses).map(FieldCapabilitiesIndexResponse::getIndexName).toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
int lastPendingIndex = 0;
for (int i = 1; i <= indexResponses.length; i++) {
if (i == indexResponses.length || hasSameMappingHash(indexResponses[lastPendingIndex], indexResponses[i]) == false) {
final String[] subIndices;
if (lastPendingIndex == 0 && i == indexResponses.length) {
subIndices = indices;
} else {
subIndices = ArrayUtil.copyOfSubArray(indices, lastPendingIndex, i);
}
innerMerge(subIndices, responseMapBuilder, request, indexResponses[lastPendingIndex]);
lastPendingIndex = i;
}
}

Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
Expand All @@ -247,7 +269,7 @@ private FieldCapabilitiesResponse merge(
if (request.includeUnmapped()) {
// do this directly, rather than using the builder, to save creating a whole lot of objects we don't need
unmapped = getUnmappedFields(
responses.keySet(),
indexResponsesMap.keySet(),
entry.getKey(),
typeMapBuilder.values().stream().flatMap(FieldCapabilities.Builder::getIndices).collect(Collectors.toSet())
);
Expand All @@ -266,7 +288,7 @@ private FieldCapabilitiesResponse merge(
)
);
}
return new FieldCapabilitiesResponse(responses.keySet().toArray(String[]::new), Collections.unmodifiableMap(responseMap), failures);
return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(responseMap), failures);
}

private static Optional<Function<Boolean, FieldCapabilities>> getUnmappedFields(
Expand All @@ -287,6 +309,7 @@ private static Optional<Function<Boolean, FieldCapabilities>> getUnmappedFields(
}

private void innerMerge(
String[] indices,
Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder,
FieldCapabilitiesRequest request,
FieldCapabilitiesIndexResponse response
Expand All @@ -306,7 +329,7 @@ private void innerMerge(
key -> new FieldCapabilities.Builder(field, key)
);
builder.add(
response.getIndexName(),
indices,
fieldCap.isMetadatafield(),
fieldCap.isSearchable(),
fieldCap.isAggregatable(),
Expand Down
Loading