Skip to content

Commit

Permalink
Security: Simplify security index listeners (#30466)
Browse files Browse the repository at this point in the history
This commit adds a general state listener to the SecurityIndexManager,
and replaces the existing health and up-to-date listeners with that. It
also moves helper methods relating to health to SecurityIndexManager
from SecurityLifecycleService.
  • Loading branch information
rjernst committed May 11, 2018
1 parent 40e7648 commit c020ba6
Show file tree
Hide file tree
Showing 39 changed files with 423 additions and 596 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_FORMAT_SETTING;
import static org.elasticsearch.xpack.core.XPackSettings.HTTP_SSL_ENABLED;
import static org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.INTERNAL_INDEX_FORMAT;

public class Security extends Plugin implements ActionPlugin, IngestPlugin, NetworkPlugin, ClusterPlugin, DiscoveryPlugin, MapperPlugin,
Expand Down Expand Up @@ -442,8 +442,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
components.add(realms);
components.add(reservedRealm);

securityLifecycleService.securityIndex().addIndexHealthChangeListener(nativeRoleMappingStore::onSecurityIndexHealthChange);
securityLifecycleService.securityIndex().addIndexOutOfDateListener(nativeRoleMappingStore::onSecurityIndexOutOfDateChange);
securityLifecycleService.securityIndex().addIndexStateListener(nativeRoleMappingStore::onSecurityIndexStateChange);

AuthenticationFailureHandler failureHandler = null;
String extensionName = null;
Expand Down Expand Up @@ -475,8 +474,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
}
final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore,
reservedRolesStore, rolesProviders, threadPool.getThreadContext(), getLicenseState());
securityLifecycleService.securityIndex().addIndexHealthChangeListener(allRolesStore::onSecurityIndexHealthChange);
securityLifecycleService.securityIndex().addIndexOutOfDateListener(allRolesStore::onSecurityIndexOutOfDateChange);
securityLifecycleService.securityIndex().addIndexStateListener(allRolesStore::onSecurityIndexStateChange);
// to keep things simple, just invalidate all cached entries on license change. this happens so rarely that the impact should be
// minimal
getLicenseState().addListener(allRolesStore::invalidateAll);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField.SECURITY_INDEX_NAME;

/**
* This class is used to provide a lifecycle for services that is based on the cluster's state
Expand All @@ -51,8 +46,6 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
public static final String INTERNAL_SECURITY_INDEX = SecurityIndexManager.INTERNAL_SECURITY_INDEX;
public static final String SECURITY_INDEX_NAME = ".security";

private static final Version MIN_READ_VERSION = Version.V_5_0_0;

private final Settings settings;
private final ThreadPool threadPool;
private final IndexAuditTrail indexAuditTrail;
Expand Down Expand Up @@ -127,36 +120,7 @@ private void close() {
}
}

public static boolean securityIndexMappingSufficientToRead(ClusterState clusterState, Logger logger) {
return checkMappingVersions(clusterState, logger, MIN_READ_VERSION::onOrBefore);
}

static boolean securityIndexMappingUpToDate(ClusterState clusterState, Logger logger) {
return checkMappingVersions(clusterState, logger, Version.CURRENT::equals);
}

private static boolean checkMappingVersions(ClusterState clusterState, Logger logger, Predicate<Version> versionPredicate) {
return SecurityIndexManager.checkIndexMappingVersionMatches(SECURITY_INDEX_NAME, clusterState, logger, versionPredicate);
}

public static List<String> indexNames() {
return Collections.unmodifiableList(Arrays.asList(SECURITY_INDEX_NAME, INTERNAL_SECURITY_INDEX));
}

/**
* Is the move from {@code previousHealth} to {@code currentHealth} a move from an unhealthy ("RED") index state to a healthy
* ("non-RED") state.
*/
public static boolean isMoveFromRedToNonRed(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
return (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED)
&& currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED;
}

/**
* Is the move from {@code previousHealth} to {@code currentHealth} a move from index-exists to index-deleted
*/
public static boolean isIndexDeleted(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
return previousHealth != null && currentHealth == null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand All @@ -31,6 +30,7 @@
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;

/**
* Responsible for cleaning the invalidated tokens from the invalidated tokens index.
Expand All @@ -50,7 +50,7 @@ final class ExpiredTokenRemover extends AbstractRunnable {

@Override
public void doRun() {
SearchRequest searchRequest = new SearchRequest(SecurityLifecycleServiceField.SECURITY_INDEX_NAME);
SearchRequest searchRequest = new SearchRequest(SECURITY_INDEX_NAME);
DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(searchRequest);
if (timeout != TimeValue.MINUS_ONE) {
expiredDbq.setTimeout(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static Map<String, Realm.Factory> getFactories(ThreadPool threadPool, Res
map.put(FileRealmSettings.TYPE, config -> new FileRealm(config, resourceWatcherService));
map.put(NativeRealmSettings.TYPE, config -> {
final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore);
securityLifecycleService.securityIndex().addIndexHealthChangeListener(nativeRealm::onSecurityIndexHealthChange);
securityLifecycleService.securityIndex().addIndexStateListener(nativeRealm::onSecurityIndexStateChange);
return nativeRealm;
});
map.put(LdapRealmSettings.AD_TYPE, config -> new LdapRealm(LdapRealmSettings.AD_TYPE, config, sslService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.ScrollHelper;
import org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.KeyAndTimestamp;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
Expand Down Expand Up @@ -118,6 +117,7 @@
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;

/**
* Service responsible for the creation, validation, and other management of {@link UserToken}
Expand Down Expand Up @@ -256,7 +256,7 @@ public void createUserToken(Authentication authentication, Authentication origin
.endObject();
builder.endObject();
IndexRequest request =
client.prepareIndex(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
client.prepareIndex(SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
.setOpType(OpType.CREATE)
.setSource(builder)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
Expand Down Expand Up @@ -372,7 +372,7 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId ->
lifecycleService.securityIndex().prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
final GetRequest getRequest =
client.prepareGet(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE,
client.prepareGet(SECURITY_INDEX_NAME, TYPE,
getTokenDocumentId(tokenId)).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
Expand Down Expand Up @@ -533,7 +533,7 @@ private void indexBwcInvalidation(UserToken userToken, ActionListener<Boolean> l
listener.onFailure(invalidGrantException("failed to invalidate token"));
} else {
final String invalidatedTokenId = getInvalidatedTokenDocumentId(userToken);
IndexRequest indexRequest = client.prepareIndex(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, invalidatedTokenId)
IndexRequest indexRequest = client.prepareIndex(SECURITY_INDEX_NAME, TYPE, invalidatedTokenId)
.setOpType(OpType.CREATE)
.setSource("doc_type", INVALIDATED_TOKEN_DOC_TYPE, "expiration_time", expirationEpochMilli)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
Expand Down Expand Up @@ -577,7 +577,7 @@ private void indexInvalidation(String tokenDocId, Version version, ActionListene
if (attemptCount.get() > 5) {
listener.onFailure(invalidGrantException("failed to invalidate token"));
} else {
UpdateRequest request = client.prepareUpdate(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, tokenDocId)
UpdateRequest request = client.prepareUpdate(SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setDoc(srcPrefix, Collections.singletonMap("invalidated", true))
.setVersion(documentVersion)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
Expand Down Expand Up @@ -609,7 +609,7 @@ private void indexInvalidation(String tokenDocId, Version version, ActionListene
|| isShardNotAvailableException(cause)) {
attemptCount.incrementAndGet();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, tokenDocId).request(),
client.prepareGet(SECURITY_INDEX_NAME, TYPE, tokenDocId).request(),
ActionListener.<GetResponse>wrap(getResult -> {
if (getResult.isExists()) {
Map<String, Object> source = getResult.getSource();
Expand Down Expand Up @@ -674,7 +674,7 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple
if (attemptCount.get() > 5) {
listener.onFailure(invalidGrantException("could not refresh the requested token"));
} else {
SearchRequest request = client.prepareSearch(SecurityLifecycleServiceField.SECURITY_INDEX_NAME)
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("doc_type", "token"))
.filter(QueryBuilders.termQuery("refresh_token.token", refreshToken)))
Expand Down Expand Up @@ -718,7 +718,7 @@ private void innerRefresh(String tokenDocId, Authentication userAuth, ActionList
if (attemptCount.getAndIncrement() > 5) {
listener.onFailure(invalidGrantException("could not refresh the requested token"));
} else {
GetRequest getRequest = client.prepareGet(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, tokenDocId).request();
GetRequest getRequest = client.prepareGet(SECURITY_INDEX_NAME, TYPE, tokenDocId).request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Expand All @@ -739,7 +739,7 @@ private void innerRefresh(String tokenDocId, Authentication userAuth, ActionList
in.setVersion(authVersion);
Authentication authentication = new Authentication(in);
UpdateRequest updateRequest =
client.prepareUpdate(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, tokenDocId)
client.prepareUpdate(SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setVersion(response.getVersion())
.setDoc("refresh_token", Collections.singletonMap("refreshed", true))
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
Expand Down Expand Up @@ -854,7 +854,7 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
);

final SearchRequest request = client.prepareSearch(SecurityLifecycleServiceField.SECURITY_INDEX_NAME)
final SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
.setScroll(TimeValue.timeValueSeconds(10L))
.setQuery(boolQuery)
.setVersion(false)
Expand Down Expand Up @@ -936,8 +936,8 @@ private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken
} else {
lifecycleService.securityIndex().prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
MultiGetRequest mGetRequest = client.prepareMultiGet()
.add(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
.add(SecurityLifecycleServiceField.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
.add(SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
.add(SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
mGetRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
package org.elasticsearch.xpack.security.authc.esnative;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
import org.elasticsearch.xpack.core.security.authc.RealmConfig;
import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.security.authc.support.CachingUsernamePasswordRealm;
import org.elasticsearch.xpack.security.support.SecurityIndexManager;

import static org.elasticsearch.xpack.security.support.SecurityIndexManager.isIndexDeleted;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.isMoveFromRedToNonRed;

/**
* User/password realm that is backed by an Elasticsearch index
Expand All @@ -37,12 +39,8 @@ protected void doAuthenticate(UsernamePasswordToken token, ActionListener<Authen
userStore.verifyPassword(token.principal(), token.credentials(), listener);
}

public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED)
&& currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED;
final boolean indexDeleted = previousHealth != null && currentHealth == null;

if (movedFromRedToNonRed || indexDeleted) {
public void onSecurityIndexStateChange(SecurityIndexManager.State previousState, SecurityIndexManager.State currentState) {
if (isMoveFromRedToNonRed(previousState, currentState) || isIndexDeleted(previousState, currentState)) {
clearCache();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
import static org.elasticsearch.xpack.core.security.SecurityLifecycleServiceField.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;

/**
* NativeUsersStore is a store for users that reads from an Elasticsearch index. This store is responsible for fetching the full
Expand Down
Loading

0 comments on commit c020ba6

Please sign in to comment.