diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index b818bad69e4e2..c2bd7a2d1e70d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -12,7 +12,6 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -27,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.core.Tuple; @@ -285,18 +285,24 @@ private void authorizeAction(final RequestInfo requestInfo, final String request final AsyncSupplier> authorizedIndicesSupplier = new CachingAsyncSupplier<>(authzIndicesListener -> authzEngine.loadAuthorizedIndices(requestInfo, authzInfo, metadata.getIndicesLookup(), authzIndicesListener)); - final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>((resolvedIndicesListener) -> { - authorizedIndicesSupplier.getAsync(ActionListener.wrap(authorizedIndices -> { - resolveIndexNames(action, request, metadata, authorizedIndices, resolvedIndicesListener); - }, e -> { - auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); - if (e instanceof IndexNotFoundException) { - listener.onFailure(e); - } else { - listener.onFailure(denialException(authentication, action, request, e)); - } - })); - }); + final AsyncSupplier resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(resolvedIndicesListener -> + authorizedIndicesSupplier.getAsync( + ActionListener.wrap( + authorizedIndices -> + resolvedIndicesListener.onResponse( + indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices) + ), + e -> { + auditTrail.accessDenied(requestId, authentication, action, request, authzInfo); + if (e instanceof IndexNotFoundException) { + listener.onFailure(e); + } else { + listener.onFailure(denialException(authentication, action, request, e)); + } + } + ) + ) + ); authzEngine.authorizeIndexAction(requestInfo, authzInfo, resolvedIndicesAsyncSupplier, metadata.getIndicesLookup(), wrapPreservingContext(new AuthorizationResultListener<>(result -> handleIndexActionAuthorizationResult(result, requestInfo, requestId, authzInfo, authzEngine, authorizedIndicesSupplier, @@ -368,21 +374,19 @@ private void runRequestInterceptors(RequestInfo requestInfo, AuthorizationInfo a if (requestInterceptors.isEmpty()) { listener.onResponse(null); } else { - Iterator requestInterceptorIterator = requestInterceptors.iterator(); - final StepListener firstStepListener = new StepListener<>(); - final RequestInterceptor first = requestInterceptorIterator.next(); - - StepListener prevListener = firstStepListener; - while (requestInterceptorIterator.hasNext()) { - final RequestInterceptor nextInterceptor = requestInterceptorIterator.next(); - final StepListener current = new StepListener<>(); - prevListener.whenComplete(v -> nextInterceptor.intercept(requestInfo, authorizationEngine, authorizationInfo, current), - listener::onFailure); - prevListener = current; - } - - prevListener.addListener(listener); - first.intercept(requestInfo, authorizationEngine, authorizationInfo, firstStepListener); + final Iterator requestInterceptorIterator = requestInterceptors.iterator(); + requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, + new ActionListener.Delegating<>(listener) { + @Override + public void onResponse(Void unused) { + if (requestInterceptorIterator.hasNext()) { + requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo, this); + } else { + listener.onResponse(null); + } + } + } + ); } } @@ -594,11 +598,6 @@ private static String getAction(BulkItemRequest item) { throw new IllegalArgumentException("No equivalent action for opType [" + docWriteRequest.opType() + "]"); } - private void resolveIndexNames(String action, TransportRequest request, Metadata metadata, Set authorizedIndices, - ActionListener listener) { - listener.onResponse(indicesAndAliasesResolver.resolve(action, request, metadata, authorizedIndices)); - } - private void putTransientIfNonExisting(String key, Object value) { Object existing = threadContext.getTransient(key); if (existing == null) { @@ -712,22 +711,27 @@ private void handleFailure(boolean audit, @Nullable String context, @Nullable Ex private static class CachingAsyncSupplier implements AsyncSupplier { private final AsyncSupplier asyncSupplier; - private V value = null; + private volatile ListenableFuture valueFuture = null; private CachingAsyncSupplier(AsyncSupplier supplier) { this.asyncSupplier = supplier; } @Override - public synchronized void getAsync(ActionListener listener) { - if (value == null) { - asyncSupplier.getAsync(ActionListener.wrap(loaded -> { - value = loaded; - listener.onResponse(value); - }, listener::onFailure)); - } else { - listener.onResponse(value); + public void getAsync(ActionListener listener) { + if (valueFuture == null) { + boolean firstInvocation = false; + synchronized (this) { + if (valueFuture == null) { + valueFuture = new ListenableFuture<>(); + firstInvocation = true; + } + } + if (firstInvocation) { + asyncSupplier.getAsync(valueFuture); + } } + valueFuture.addListener(listener); } }