Skip to content

Commit

Permalink
[CCR] Validate index privileges prior to following an index (#33758)
Browse files Browse the repository at this point in the history
Prior to following an index in the follow API, check whether current
user has sufficient privileges in the leader cluster to read and
monitor the leader index.

Also check this in the create and follow API prior to creating the
follow index.

Also introduced READ_CCR cluster privilege that include the minimal
cluster level actions that are required for ccr in the leader cluster.
So a user can follow indices in a cluster, but not use the ccr admin APIs.

Closes #33553

Co-authored-by: Jason Tedor <[email protected]>
  • Loading branch information
martijnvg and jasontedor authored Sep 28, 2018
1 parent 3d7e3b2 commit a984f8a
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 39 deletions.
4 changes: 2 additions & 2 deletions x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ leaderClusterTestCluster {
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'false'
extraConfigFile 'roles.yml', 'roles.yml'
extraConfigFile 'roles.yml', 'leader-roles.yml'
setupCommand 'setupTestAdmin',
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
setupCommand 'setupCcrUser',
Expand Down Expand Up @@ -48,7 +48,7 @@ followClusterTestCluster {
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.collection.enabled', 'true'
extraConfigFile 'roles.yml', 'roles.yml'
extraConfigFile 'roles.yml', 'follower-roles.yml'
setupCommand 'setupTestAdmin',
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
setupCommand 'setupCcrUser',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ccruser:
cluster:
- read_ccr
indices:
- names: [ 'allowed-index', 'logs-eu-*' ]
privileges:
- monitor
- read
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

// User does not have create_follow_index index privilege for 'unallowedIndex':
Exception e = expectThrows(ResponseException.class,
() -> follow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
Expand All @@ -112,9 +113,22 @@ public void testFollowIndex() throws Exception {
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

// User does have create_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
e = expectThrows(ResponseException.class,
() -> follow("leader_cluster:" + unallowedIndex, allowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class,
() -> resumeFollow("leader_cluster:" + unallowedIndex, unallowedIndex));
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
}
Expand All @@ -125,8 +139,15 @@ public void testAutoFollowPatterns() throws Exception {
String allowedIndex = "logs-eu-20190101";
String disallowedIndex = "logs-us-20190101";

{
Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
Exception e = expectThrows(ResponseException.class, () -> assertOK(client().performRequest(request)));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [logs-*]"));
}

Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-eu-*\"]}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
Expand All @@ -25,6 +26,8 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -33,8 +36,16 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.support.Exceptions;

import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,21 +63,24 @@
public final class CcrLicenseChecker {

private final BooleanSupplier isCcrAllowed;
private final BooleanSupplier isAuthAllowed;

/**
* Constructs a CCR license checker with the default rule based on the license state for checking if CCR is allowed.
*/
CcrLicenseChecker() {
this(XPackPlugin.getSharedLicenseState()::isCcrAllowed);
this(XPackPlugin.getSharedLicenseState()::isCcrAllowed, XPackPlugin.getSharedLicenseState()::isAuthAllowed);
}

/**
* Constructs a CCR license checker with the specified boolean supplier.
* Constructs a CCR license checker with the specified boolean suppliers.
*
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
* @param isAuthAllowed a boolean supplier that should return true if security, authentication, and authorization is allowed
*/
public CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
public CcrLicenseChecker(final BooleanSupplier isCcrAllowed, final BooleanSupplier isAuthAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed, "isCcrAllowed");
this.isAuthAllowed = Objects.requireNonNull(isAuthAllowed, "isAuthAllowed");
}

/**
Expand Down Expand Up @@ -116,8 +130,13 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
}

final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> {
consumer.accept(historyUUIDs, leaderIndexMetaData);
hasPrivilegesToFollowIndices(leaderClient, new String[] {leaderIndex}, e -> {
if (e == null) {
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs ->
consumer.accept(historyUUIDs, leaderIndexMetaData));
} else {
onFailure.accept(e);
}
});
},
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
Expand All @@ -136,9 +155,8 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchClusterState(
public void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
Expand Down Expand Up @@ -259,6 +277,64 @@ public void fetchLeaderHistoryUUIDs(
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

/**
* Check if the user executing the current action has privileges to follow the specified indices on the cluster specified by the leader
* client. The specified callback will be invoked with null if the user has the necessary privileges to follow the specified indices,
* otherwise the callback will be invoked with an exception outlining the authorization error.
*
* @param leaderClient the leader client
* @param indices the indices
* @param handler the callback
*/
public void hasPrivilegesToFollowIndices(final Client leaderClient, final String[] indices, final Consumer<Exception> handler) {
Objects.requireNonNull(leaderClient, "leaderClient");
Objects.requireNonNull(indices, "indices");
if (indices.length == 0) {
throw new IllegalArgumentException("indices must not be empty");
}
Objects.requireNonNull(handler, "handler");
if (isAuthAllowed.getAsBoolean() == false) {
handler.accept(null);
return;
}

ThreadContext threadContext = leaderClient.threadPool().getThreadContext();
SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
String username = securityContext.getUser().principal();

RoleDescriptor.IndicesPrivileges privileges = RoleDescriptor.IndicesPrivileges.builder()
.indices(indices)
.privileges(IndicesStatsAction.NAME, ShardChangesAction.NAME)
.build();

HasPrivilegesRequest request = new HasPrivilegesRequest();
request.username(username);
request.clusterPrivileges(Strings.EMPTY_ARRAY);
request.indexPrivileges(privileges);
request.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
CheckedConsumer<HasPrivilegesResponse, Exception> responseHandler = response -> {
if (response.isCompleteMatch()) {
handler.accept(null);
} else {
StringBuilder message = new StringBuilder("insufficient privileges to follow");
message.append(indices.length == 1 ? " index " : " indices ");
message.append(Arrays.toString(indices));

HasPrivilegesResponse.ResourcePrivileges resourcePrivileges = response.getIndexPrivileges().get(0);
for (Map.Entry<String, Boolean> entry : resourcePrivileges.getPrivileges().entrySet()) {
if (entry.getValue() == false) {
message.append(", privilege for action [");
message.append(entry.getKey());
message.append("] is missing");
}
}

handler.accept(Exceptions.authorizationError(message.toString()));
}
};
leaderClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
}

public static Client wrapClient(Client client, Map<String, String> headers) {
if (headers.isEmpty()) {
return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,33 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

leaderClient.admin().cluster().state(
clusterStateRequest,
ActionListener.wrap(
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(leaderClient, indices, e -> {
if (e == null) {
leaderClient.admin().cluster().state(
clusterStateRequest,
ActionListener.wrap(
clusterStateResponse -> {
final ClusterState leaderClusterState = clusterStateResponse.getState();
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderClusterAlias(),
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, leaderClusterState);
}
});
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, leaderClusterState);
}
});
},
listener::onFailure));
} else {
listener.onFailure(e);
}
});
}

static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ private void createFollowerIndexAndFollowLocalIndex(
return;
}

Consumer<String[]> handler = historyUUIDs -> {
Consumer<String[]> historyUUIDhandler = historyUUIDs -> {
createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener);
};
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, handler);
ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {leaderIndex}, e -> {
if (e == null) {
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDhandler);
} else {
listener.onFailure(e);
}
});
}

private void createFollowerIndexAndFollowRemoteIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,16 @@ private void followLocalIndex(final ResumeFollowAction.Request request,
if (leaderIndexMetadata == null) {
throw new IndexNotFoundException(request.getFollowerIndex());
}
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> {
try {
start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener);
} catch (final IOException e) {
ccrLicenseChecker.hasPrivilegesToFollowIndices(client, new String[] {request.getLeaderIndex()}, e -> {
if (e == null) {
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> {
try {
start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener);
} catch (final IOException ioe) {
listener.onFailure(ioe);
}
});
} else {
listener.onFailure(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testGetEngineFactory() throws IOException {
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final Ccr ccr = new Ccr(Settings.EMPTY, new CcrLicenseChecker(() -> true));
final Ccr ccr = new Ccr(Settings.EMPTY, new CcrLicenseChecker(() -> true, () -> false));
final Optional<EngineFactory> engineFactory = ccr.getEngineFactory(new IndexSettings(indexMetaData, Settings.EMPTY));
if (value != null && value) {
assertTrue(engineFactory.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class LocalStateCcr extends LocalStateCompositeXPackPlugin {
public LocalStateCcr(final Settings settings, final Path configPath) throws Exception {
super(settings, configPath);

plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> true)) {
plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> true, () -> false)) {

@Override
protected XPackLicenseState getLicenseState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class NonCompliantLicenseLocalStateCcr extends LocalStateCompositeXPackPl
public NonCompliantLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception {
super(settings, configPath);

plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> false)) {
plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> false, () -> false)) {

@Override
protected XPackLicenseState getLicenseState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void testStats() {
null,
null,
mock(ClusterService.class),
new CcrLicenseChecker(() -> true)
new CcrLicenseChecker(() -> true, () -> false)
);

autoFollowCoordinator.updateStats(Collections.singletonList(
Expand Down
Loading

0 comments on commit a984f8a

Please sign in to comment.