Skip to content

Commit

Permalink
Group field caps response by index mapping hash (elastic#83494)
Browse files Browse the repository at this point in the history
This commit utilizes the index mapping hash to share the fields-caps for 
indices with the same index mapping to reduce the memory usage and the
size of transport messages.

Closes elastic#78665
Closes elastic#82879
  • Loading branch information
dnhatn authored Feb 17, 2022
1 parent acf9968 commit 69e898d
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 71 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/83494.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 83494
summary: Group field caps response by index mapping hash
area: Search
type: enhancement
issues:
- 78665
- 82879
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -37,6 +38,7 @@
*/
class FieldCapabilitiesFetcher {
private final IndicesService indicesService;
private final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = new HashMap<>();

FieldCapabilitiesFetcher(IndicesService indicesService) {
this.indicesService = indicesService;
Expand Down Expand Up @@ -65,17 +67,34 @@ FieldCapabilitiesIndexResponse fetch(
);

if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false);
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), null, Collections.emptyMap(), false);
}

Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
final MappingMetadata mapping = indexService.getMetadata().mapping();
final String indexMappingHash = mapping != null ? mapping.getSha256() : null;
if (indexMappingHash != null) {
final Map<String, IndexFieldCapabilities> existing = indexMappingHashToResponses.get(indexMappingHash);
if (existing != null) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, existing, true);
}
}

return retrieveFieldCaps(shardId.getIndexName(), searchExecutionContext, fieldPatterns, filters, fieldTypes, fieldPredicate);
Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
final Map<String, IndexFieldCapabilities> responseMap = retrieveFieldCaps(
searchExecutionContext,
fieldPatterns,
filters,
fieldTypes,
fieldPredicate
);
if (indexMappingHash != null) {
indexMappingHashToResponses.put(indexMappingHash, responseMap);
}
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, responseMap, true);
}
}

public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
String indexName,
static Map<String, IndexFieldCapabilities> retrieveFieldCaps(
SearchExecutionContext context,
String[] fieldPatterns,
String[] filters,
Expand Down Expand Up @@ -141,7 +160,7 @@ public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
}
}
}
return new FieldCapabilitiesIndexResponse(indexName, responseMap, true);
return responseMap;
}

private static boolean checkIncludeParents(String[] filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,113 @@
package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class FieldCapabilitiesIndexResponse implements Writeable {
private static final Version MAPPING_HASH_VERSION = Version.V_8_2_0;

public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
@Nullable
private final String indexMappingHash;
private final Map<String, IndexFieldCapabilities> responseMap;
private final boolean canMatch;
private final transient Version originVersion;

FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
FieldCapabilitiesIndexResponse(
String indexName,
@Nullable String indexMappingHash,
Map<String, IndexFieldCapabilities> responseMap,
boolean canMatch
) {
this.indexName = indexName;
this.indexMappingHash = indexMappingHash;
this.responseMap = responseMap;
this.canMatch = canMatch;
this.originVersion = Version.CURRENT;
}

FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.canMatch = in.readBoolean();
this.originVersion = in.getVersion();
if (in.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
this.indexMappingHash = in.readOptionalString();
} else {
this.indexMappingHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
out.writeOptionalString(indexMappingHash);
}
}

private record GroupByMappingHash(List<String> indices, String indexMappingHash, Map<String, IndexFieldCapabilities> responseMap)
implements
Writeable {
GroupByMappingHash(StreamInput in) throws IOException {
this(in.readStringList(), in.readString(), in.readMap(StreamInput::readString, IndexFieldCapabilities::new));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indices);
out.writeString(indexMappingHash);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
}

List<FieldCapabilitiesIndexResponse> getResponses() {
return indices.stream().map(index -> new FieldCapabilitiesIndexResponse(index, indexMappingHash, responseMap, true)).toList();
}
}

static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
if (input.getVersion().before(MAPPING_HASH_VERSION)) {
return input.readList(FieldCapabilitiesIndexResponse::new);
}
final List<FieldCapabilitiesIndexResponse> ungroupedList = input.readList(FieldCapabilitiesIndexResponse::new);
final List<GroupByMappingHash> groups = input.readList(GroupByMappingHash::new);
return Stream.concat(ungroupedList.stream(), groups.stream().flatMap(g -> g.getResponses().stream())).toList();
}

static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
if (output.getVersion().before(MAPPING_HASH_VERSION)) {
output.writeCollection(responses);
return;
}
final Predicate<FieldCapabilitiesIndexResponse> canGroup = r -> r.canMatch && r.indexMappingHash != null;
final List<FieldCapabilitiesIndexResponse> ungroupedResponses = responses.stream().filter(r -> canGroup.test(r) == false).toList();
final List<GroupByMappingHash> groupedResponses = responses.stream()
.filter(canGroup)
.collect(Collectors.groupingBy(r -> r.indexMappingHash))
.values()
.stream()
.map(rs -> {
final String indexMappingHash = rs.get(0).indexMappingHash;
final Map<String, IndexFieldCapabilities> responseMap = rs.get(0).responseMap;
final List<String> indices = rs.stream().map(r -> r.indexName).toList();
return new GroupByMappingHash(indices, indexMappingHash, responseMap);
})
.toList();
output.writeList(ungroupedResponses);
output.writeList(groupedResponses);
}

/**
Expand All @@ -46,6 +125,14 @@ public String getIndexName() {
return indexName;
}

/**
* Returns the index mapping hash associated with this index if exists
*/
@Nullable
public String getIndexMappingHash() {
return indexMappingHash;
}

public boolean canMatch() {
return canMatch;
}
Expand All @@ -69,23 +156,19 @@ Version getOriginVersion() {
return originVersion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
return canMatch == that.canMatch
&& Objects.equals(indexName, that.indexName)
&& Objects.equals(indexMappingHash, that.indexMappingHash)
&& Objects.equals(responseMap, that.responseMap);
}

@Override
public int hashCode() {
return Objects.hash(indexName, responseMap, canMatch);
return Objects.hash(indexName, indexMappingHash, responseMap, canMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable

FieldCapabilitiesNodeResponse(StreamInput in) throws IOException {
super(in);
this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readMap(ShardId::new, StreamInput::readException);
this.unmatchedShardIds = in.readSet(ShardId::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
out.writeCollection(unmatchedShardIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readList(FieldCapabilitiesFailure::new);
}

Expand Down Expand Up @@ -141,7 +141,7 @@ private static Map<String, FieldCapabilities> readField(StreamInput in) throws I
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeList(failures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -55,8 +56,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher;
private final Predicate<String> metadataFieldPred;
private final IndicesService indicesService;
private final boolean ccsCheckCompatibility;

@Inject
Expand All @@ -73,7 +74,7 @@ public TransportFieldCapabilitiesAction(
this.transportService = transportService;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService);
this.indicesService = indicesService;
final Set<String> metadataFields = indicesService.getAllMetadataFields();
this.metadataFieldPred = metadataFields::contains;
transportService.registerRequestHandler(
Expand Down Expand Up @@ -112,6 +113,17 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
checkIndexBlocks(clusterState, concreteIndices);

final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
Map<String, IndexFieldCapabilities> curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get());
if (curr != null) {
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true);
}
}
indexResponses.putIfAbsent(resp.getIndexName(), resp);
};
final FailureCollector indexFailures = new FailureCollector();
// One for each cluster including the local cluster
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
Expand All @@ -125,7 +137,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
nowInMillis,
concreteIndices,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse),
handleIndexResponse,
indexFailures::collect,
countDown
);
Expand All @@ -141,7 +153,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
handleIndexResponse.accept(
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
);
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
Expand Down Expand Up @@ -347,12 +361,13 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
final Map<String, List<ShardId>> groupedShardIds = request.shardIds()
.stream()
.collect(Collectors.groupingBy(ShardId::getIndexName));
final FieldCapabilitiesFetcher fetcher = new FieldCapabilitiesFetcher(indicesService);
for (List<ShardId> shardIds : groupedShardIds.values()) {
final Map<ShardId, Exception> failures = new HashMap<>();
final Set<ShardId> unmatched = new HashSet<>();
for (ShardId shardId : shardIds) {
try {
final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(
final FieldCapabilitiesIndexResponse response = fetcher.fetch(
shardId,
request.fields(),
request.filters(),
Expand Down
Loading

0 comments on commit 69e898d

Please sign in to comment.