Skip to content

Commit

Permalink
Introduce Predicate Utilities for always true/false use-cases (elasti…
Browse files Browse the repository at this point in the history
…c#105881)

Just a suggetion. I think this would save us a bit of memory here and
there. We have loads of places where the always true lambdas are used
with `Predicate.or/and`. Found this initially when looking into field
caps performance where we used to heavily compose these but many spots
in security and index name resolution gain from these predicates.
The better toString also helps in some cases at least when debugging.
  • Loading branch information
original-brownbear authored Mar 4, 2024
1 parent 9c1a079 commit fc8e2b7
Show file tree
Hide file tree
Showing 55 changed files with 246 additions and 93 deletions.
92 changes: 92 additions & 0 deletions libs/core/src/main/java/org/elasticsearch/core/Predicates.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.core;

import java.util.function.Predicate;

/**
* Utilities around predicates.
*/
public enum Predicates {
;

@SuppressWarnings("rawtypes")
private static final Predicate NEVER = new Predicate() {
@Override
public boolean test(Object o) {
return false;
}

@Override
public Predicate and(Predicate other) {
return this;
}

@Override
public Predicate negate() {
return ALWAYS;
}

@Override
public Predicate or(Predicate other) {
return other;
}

@Override
public String toString() {
return "Predicate[NEVER]";
}
};

@SuppressWarnings("rawtypes")
private static final Predicate ALWAYS = new Predicate() {
@Override
public boolean test(Object o) {
return true;
}

@Override
public Predicate and(Predicate other) {
return other;
}

@Override
public Predicate negate() {
return NEVER;
}

@Override
public Predicate or(Predicate other) {
return this;
}

@Override
public String toString() {
return "Predicate[ALWAYS]";
}
};

/**
* @return a predicate that accepts all input values
* @param <T> type of the predicate
*/
@SuppressWarnings("unchecked")
public static <T> Predicate<T> always() {
return (Predicate<T>) ALWAYS;
}

/**
* @return a predicate that rejects all input values
* @param <T> type of the predicate
*/
@SuppressWarnings("unchecked")
public static <T> Predicate<T> never() {
return (Predicate<T>) NEVER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand Down Expand Up @@ -100,7 +101,7 @@ private static Consumer<IngestDocument> buildExecution(
final Predicate<String> keyFilter;
if (includeKeys == null) {
if (excludeKeys == null) {
keyFilter = key -> true;
keyFilter = Predicates.always();
} else {
keyFilter = key -> excludeKeys.contains(key) == false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -85,8 +86,8 @@ protected void masterOperation(
final String[] aliases = indexNameExpressionResolver.indexAliases(
clusterState,
index,
aliasMetadata -> true,
dataStreamAlias -> true,
Predicates.always(),
Predicates.always(),
true,
indicesAndAliases
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -89,7 +90,7 @@ protected void masterOperation(
final CancellableTask cancellableTask = (CancellableTask) task;

final Predicate<ClusterState> acceptableClusterStatePredicate = request.waitForMetadataVersion() == null
? clusterState -> true
? Predicates.always()
: clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();

final Predicate<ClusterState> acceptableClusterStateOrFailedPredicate = request.local()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel;
Expand Down Expand Up @@ -160,7 +161,7 @@ private static void checkSystemIndexAccess(
) {
final Predicate<String> systemIndexAccessAllowPredicate;
if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) {
systemIndexAccessAllowPredicate = indexName -> false;
systemIndexAccessAllowPredicate = Predicates.never();
} else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) {
systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext);
} else {
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
Expand Down Expand Up @@ -104,14 +105,14 @@ static class RetryHandler extends DelegatingActionListener<BulkResponse, BulkRes
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures() == false) {
// we're done here, include all responses
addResponses(bulkItemResponses, (r -> true));
addResponses(bulkItemResponses, Predicates.always());
finishHim();
} else {
if (canRetry(bulkItemResponses)) {
addResponses(bulkItemResponses, (r -> r.isFailed() == false));
retry(createBulkRequestForRetry(bulkItemResponses));
} else {
addResponses(bulkItemResponses, (r -> true));
addResponses(bulkItemResponses, Predicates.always());
finishHim();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -183,7 +184,7 @@ public void onResponse(BulkResponse bulkItemResponses) {
bulkItemResponses.getItems().length
);
// we're done here, include all responses
addResponses(bulkItemResponses, (r -> true));
addResponses(bulkItemResponses, Predicates.always());
listener.onResponse(getAccumulatedResponse());
} else {
if (canRetry(bulkItemResponses)) {
Expand All @@ -201,7 +202,7 @@ public void onResponse(BulkResponse bulkItemResponses) {
bulkItemResponses.getTook(),
bulkItemResponses.getItems().length
);
addResponses(bulkItemResponses, (r -> true));
addResponses(bulkItemResponses, Predicates.always());
listener.onResponse(getAccumulatedResponse());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.InstantiatingObjectParser;
Expand Down Expand Up @@ -567,7 +568,7 @@ private String[] filterIndices(int length, Predicate<IndexCaps> pred) {
}

FieldCapabilities build(boolean withIndices) {
final String[] indices = withIndices ? filterIndices(totalIndices, ic -> true) : null;
final String[] indices = withIndices ? filterIndices(totalIndices, Predicates.always()) : null;

// Iff this field is searchable in some indices AND non-searchable in others
// we record the list of non-searchable indices
Expand Down Expand Up @@ -603,7 +604,7 @@ FieldCapabilities build(boolean withIndices) {
// Collect all indices that have this field. If it is marked differently in different indices, we cannot really
// make a decisions which index is "right" and which index is "wrong" so collecting all indices where this field
// is present is probably the only sensible thing to do here
metricConflictsIndices = Objects.requireNonNullElseGet(indices, () -> filterIndices(totalIndices, ic -> true));
metricConflictsIndices = Objects.requireNonNullElseGet(indices, () -> filterIndices(totalIndices, Predicates.always()));
} else {
metricConflictsIndices = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.core.Predicates;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -49,7 +50,7 @@ private static Function<IndexFieldCapabilities, IndexFieldCapabilities> buildTra
String[] filters,
String[] allowedTypes
) {
Predicate<IndexFieldCapabilities> test = ifc -> true;
Predicate<IndexFieldCapabilities> test = Predicates.always();
Set<String> objects = null;
Set<String> nestedObjects = null;
if (allowedTypes.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -198,8 +199,8 @@ private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
String[] aliases = indexNameExpressionResolver.indexAliases(
clusterState,
index,
aliasMetadata -> true,
dataStreamAlias -> true,
Predicates.always(),
Predicates.always(),
true,
indicesAndAliases
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.bootstrap;

import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.SuppressForbidden;

import java.io.FilePermission;
Expand Down Expand Up @@ -201,7 +202,10 @@ public String getActions() {
// from this policy file or further restrict it to code sources
// that you specify, because Thread.stop() is potentially unsafe."
// not even sure this method still works...
private static final Permission BAD_DEFAULT_NUMBER_ONE = new BadDefaultPermission(new RuntimePermission("stopThread"), p -> true);
private static final Permission BAD_DEFAULT_NUMBER_ONE = new BadDefaultPermission(
new RuntimePermission("stopThread"),
Predicates.always()
);

// default policy file states:
// "allows anyone to listen on dynamic ports"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,8 +34,6 @@ public class ClusterStateObserver {

public static final Predicate<ClusterState> NON_NULL_MASTER_PREDICATE = state -> state.nodes().getMasterNode() != null;

private static final Predicate<ClusterState> MATCH_ALL_CHANGES_PREDICATE = state -> true;

private final ClusterApplierService clusterApplierService;
private final ThreadPool threadPool;
private final ThreadContext contextHolder;
Expand Down Expand Up @@ -109,11 +108,11 @@ public boolean isTimedOut() {
}

public void waitForNextChange(Listener listener) {
waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE);
waitForNextChange(listener, Predicates.always());
}

public void waitForNextChange(Listener listener, @Nullable TimeValue timeOutValue) {
waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE, timeOutValue);
waitForNextChange(listener, Predicates.always(), timeOutValue);
}

public void waitForNextChange(Listener listener, Predicate<ClusterState> statePredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -59,8 +60,6 @@
public class IndexNameExpressionResolver {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IndexNameExpressionResolver.class);

private static final Predicate<String> ALWAYS_TRUE = s -> true;

public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds";
public static final IndexVersion SYSTEM_INDEX_ENFORCEMENT_INDEX_VERSION = IndexVersions.V_8_0_0;

Expand Down Expand Up @@ -101,7 +100,7 @@ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, Indi
false,
request.includeDataStreams(),
SystemIndexAccessLevel.BACKWARDS_COMPATIBLE_ONLY,
ALWAYS_TRUE,
Predicates.always(),
this.getNetNewSystemIndexPredicate()
);
return concreteIndexNames(context, request.indices());
Expand Down Expand Up @@ -397,7 +396,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {

private void checkSystemIndexAccess(Context context, Set<Index> concreteIndices) {
final Predicate<String> systemIndexAccessPredicate = context.getSystemIndexAccessPredicate();
if (systemIndexAccessPredicate == ALWAYS_TRUE) {
if (systemIndexAccessPredicate == Predicates.<String>always()) {
return;
}
doCheckSystemIndexAccess(context, concreteIndices, systemIndexAccessPredicate);
Expand Down Expand Up @@ -947,11 +946,11 @@ public Predicate<String> getSystemIndexAccessPredicate() {
final SystemIndexAccessLevel systemIndexAccessLevel = getSystemIndexAccessLevel();
final Predicate<String> systemIndexAccessLevelPredicate;
if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) {
systemIndexAccessLevelPredicate = s -> false;
systemIndexAccessLevelPredicate = Predicates.never();
} else if (systemIndexAccessLevel == SystemIndexAccessLevel.BACKWARDS_COMPATIBLE_ONLY) {
systemIndexAccessLevelPredicate = getNetNewSystemIndexPredicate();
} else if (systemIndexAccessLevel == SystemIndexAccessLevel.ALL) {
systemIndexAccessLevelPredicate = ALWAYS_TRUE;
systemIndexAccessLevelPredicate = Predicates.always();
} else {
// everything other than allowed should be included in the deprecation message
systemIndexAccessLevelPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext);
Expand Down Expand Up @@ -981,7 +980,7 @@ public static class Context {
private final Predicate<String> netNewSystemIndexPredicate;

Context(ClusterState state, IndicesOptions options, SystemIndexAccessLevel systemIndexAccessLevel) {
this(state, options, systemIndexAccessLevel, ALWAYS_TRUE, s -> false);
this(state, options, systemIndexAccessLevel, Predicates.always(), Predicates.never());
}

Context(
Expand Down Expand Up @@ -1722,7 +1721,7 @@ public ResolverContext() {
}

public ResolverContext(long startTime) {
super(null, null, startTime, false, false, false, false, SystemIndexAccessLevel.ALL, name -> false, name -> false);
super(null, null, startTime, false, false, false, false, SystemIndexAccessLevel.ALL, Predicates.never(), Predicates.never());
}

@Override
Expand Down
Loading

0 comments on commit fc8e2b7

Please sign in to comment.