Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use origin for the client when running _features/_reset #88622

Merged
merged 11 commits into from
Jul 22, 2022
6 changes: 6 additions & 0 deletions docs/changelog/88622.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88622
summary: Use origin for the client when running _features/_reset
area: Infra/Core
type: bug
issues:
- 88617
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
Expand Down Expand Up @@ -73,15 +74,15 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build()
)
.setOrigin("net-new")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fake origins don't work anymore, since we actually switch the security. I picked tasks since it's not in a module.

.setOrigin(TASKS_ORIGIN)
.setVersionMetaKey("version")
.setPrimaryIndex(".net-new-system-index-" + Version.CURRENT.major)
.build(),
SystemIndexDescriptor.builder()
.setIndexPattern(INTERNAL_UNMANAGED_INDEX_NAME)
.setDescription("internal unmanaged system index")
.setType(SystemIndexDescriptor.Type.INTERNAL_UNMANAGED)
.setOrigin("qa")
.setOrigin(TASKS_ORIGIN)
.setAliasName(".internal-unmanaged-alias")
.build(),
SystemIndexDescriptor.builder()
Expand All @@ -96,7 +97,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build()
)
.setOrigin("qa")
.setOrigin(TASKS_ORIGIN)
.setVersionMetaKey("version")
.setPrimaryIndex(".internal-managed-index-" + Version.CURRENT.major)
.setAliasName(".internal-managed-alias")
Expand Down
92 changes: 76 additions & 16 deletions server/src/main/java/org/elasticsearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@

package org.elasticsearch.indices;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.automaton.Automata;
import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse.ResetFeatureStateStatus;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -105,6 +110,8 @@ public class SystemIndices {

private static final Automaton EMPTY = Automata.makeEmpty();

private static final Logger logger = LogManager.getLogger(SystemIndices.class);

/**
* This is the source for non-plugin system features.
*/
Expand Down Expand Up @@ -328,6 +335,11 @@ public ExecutorSelector getExecutorSelector() {
* @return The matching {@link SystemIndexDescriptor} or {@code null} if no descriptor is found
*/
public @Nullable SystemIndexDescriptor findMatchingDescriptor(String name) {
return findMatchingDescriptor(indexDescriptors, name);
}

@Nullable
static SystemIndexDescriptor findMatchingDescriptor(SystemIndexDescriptor[] indexDescriptors, String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not objecting necessarily, but why break this out? I assumed it was for testing in my first pass given it's pkg-private, but there's no tests that use this version looks like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, no need for that to be separate.

SystemIndexDescriptor matchingDescriptor = null;
for (SystemIndexDescriptor systemIndexDescriptor : indexDescriptors) {
if (systemIndexDescriptor.matchesIndexPattern(name)) {
Expand Down Expand Up @@ -854,6 +866,27 @@ public MigrationCompletionHandler getPostMigrationFunction() {
return postMigrationFunction;
}

private static void cleanUpFeatureForIndices(
String name,
Client client,
String[] indexNames,
final ActionListener<ResetFeatureStateStatus> listener
) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
deleteIndexRequest.indices(indexNames);
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
listener.onResponse(ResetFeatureStateStatus.success(name));
}

@Override
public void onFailure(Exception e) {
listener.onResponse(ResetFeatureStateStatus.failure(name, e));
}
});
}

/**
* Clean up the state of a feature
* @param indexDescriptors List of descriptors of a feature's system indices
Expand All @@ -864,39 +897,66 @@ public MigrationCompletionHandler getPostMigrationFunction() {
* @param listener A listener to return success or failure of cleanup
*/
public static void cleanUpFeature(
Collection<? extends IndexPatternMatcher> indexDescriptors,
Collection<SystemIndexDescriptor> indexDescriptors,
Collection<? extends IndexPatternMatcher> associatedIndexDescriptors,
String name,
ClusterService clusterService,
Client client,
ActionListener<ResetFeatureStateStatus> listener
final ActionListener<ResetFeatureStateStatus> listener
) {
Metadata metadata = clusterService.state().getMetadata();

List<String> allIndices = Stream.concat(indexDescriptors.stream(), associatedIndexDescriptors.stream())
List<String> associatedIndices = associatedIndexDescriptors.stream()
.map(descriptor -> descriptor.getMatchingIndices(metadata))
.flatMap(List::stream)
.toList();

if (allIndices.isEmpty()) {
// if no actual indices match the pattern, we can stop here
final int taskCount = ((associatedIndices.size() > 0) ? 1 : 0) + (int) indexDescriptors.stream()
.filter(id -> id.getMatchingIndices(metadata).isEmpty() == false)
.count();

// check if there's nothing to do and take an early out
if (taskCount == 0) {
listener.onResponse(ResetFeatureStateStatus.success(name));
return;
}

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
deleteIndexRequest.indices(allIndices.toArray(Strings.EMPTY_ARRAY));
client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
listener.onResponse(ResetFeatureStateStatus.success(name));
}
GroupedActionListener<ResetFeatureStateStatus> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(listenerResults -> {
List<ResetFeatureStateStatus> errors = listenerResults.stream()
.filter(status -> status.getStatus() == ResetFeatureStateResponse.ResetFeatureStateStatus.Status.FAILURE)
.collect(Collectors.toList());

@Override
public void onFailure(Exception e) {
listener.onResponse(ResetFeatureStateStatus.failure(name, e));
if (errors.isEmpty()) {
listener.onResponse(ResetFeatureStateStatus.success(name));
} else {
StringBuilder exceptions = new StringBuilder("[");
exceptions.append(errors.stream().map(e -> e.getException().getMessage()).collect(Collectors.joining(", ")));
exceptions.append(']');
errors.forEach(e -> logger.warn(() -> "error while resetting feature [" + name + "]", e.getException()));
listener.onResponse(ResetFeatureStateStatus.failure(name, new Exception(exceptions.toString())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add logging that captures the full stack traces? It's nitpicky but it's no fun when we hit a weird error and realize there's no way to get the actual stack trace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure sounds good.

}
}, listener::onFailure),
taskCount
);

// Send cleanup for the associated indices, they don't need special origin since they are not protected
if (associatedIndices.isEmpty() == false) {
cleanUpFeatureForIndices(name, client, associatedIndices.toArray(Strings.EMPTY_ARRAY), groupedListener);
}

// One descriptor at a time, create an originating client and clean up the feature
for (var indexDescriptor : indexDescriptors) {
List<String> matchingIndices = indexDescriptor.getMatchingIndices(metadata);

if (matchingIndices.isEmpty() == false) {
final Client clientWithOrigin = (indexDescriptor.getOrigin() == null)
? client
: new OriginSettingClient(client, indexDescriptor.getOrigin());

cleanUpFeatureForIndices(name, clientWithOrigin, matchingIndices.toArray(Strings.EMPTY_ARRAY), groupedListener);
}
});
}
}

// No-op pre-migration function to be used as the default in case none are provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,16 @@ public void cleanUpFeature(
List<SystemIndexPlugin> systemPlugins = filterPlugins(SystemIndexPlugin.class);

GroupedActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> allListeners = new GroupedActionListener<>(
ActionListener.wrap(
listenerResults -> finalListener.onResponse(ResetFeatureStateStatus.success(getFeatureName())),
finalListener::onFailure
),
ActionListener.wrap(listenerResults -> {
// If the clean-up produced only one result, use that to pass along. In most
// cases it should be 1-1 mapping of feature to response. Passing back success
// prevents us from writing validation tests on this API.
if (listenerResults != null && listenerResults.size() == 1) {
finalListener.onResponse(listenerResults.stream().findFirst().get());
} else {
finalListener.onResponse(ResetFeatureStateStatus.success(getFeatureName()));
}
}, finalListener::onFailure),
systemPlugins.size()
);
systemPlugins.forEach(plugin -> plugin.cleanUpFeature(clusterService, client, allListeners));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.integration;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest;
import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.TestSecurityClient;
import org.elasticsearch.xpack.core.security.user.User;
import org.junit.Before;

import java.util.Collections;

import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.BASIC_AUTH_HEADER;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;

/**
* These tests ensure that the Feature Reset API works for users with default superuser and manage roles.
* This can be complex due to restrictions on system indices and the need to use the correct origin for
* each index. See also https://github.com/elastic/elasticsearch/issues/88617
*/
public class SecurityFeatureResetTests extends SecurityIntegTestCase {
grcevski marked this conversation as resolved.
Show resolved Hide resolved
private static final SecureString SUPER_USER_PASSWD = SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Before
public void setupForTests() throws Exception {
// adds a dummy user to the native realm to force .security index creation
new TestSecurityClient(getRestClient(), SecuritySettingsSource.SECURITY_REQUEST_OPTIONS).putUser(
new User("dummy_user", "missing_role"),
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING
);
assertSecurityIndexActive();
}

@Override
protected String configUsers() {
final String usersPasswHashed = new String(getFastStoredHashAlgoForTests().hash(SUPER_USER_PASSWD));
return super.configUsers()
+ "su:"
+ usersPasswHashed
+ "\n"
+ "manager:"
+ usersPasswHashed
+ "\n"
+ "usr:"
+ usersPasswHashed
+ "\n";
}

@Override
protected String configUsersRoles() {
return super.configUsersRoles() + """
superuser:su
role1:manager
role2:usr""";
}

@Override
protected String configRoles() {
return super.configRoles() + """
%s
role1:
cluster: [ manage ]
indices:
- names: '*'
privileges: [ manage ]
role2:
cluster: [ monitor ]
indices:
- names: '*'
privileges: [ read ]
""";
}

public void testFeatureResetSuperuser() {
assertResetSuccessful("su", SUPER_USER_PASSWD);
}

public void testFeatureResetManageRole() {
assertResetSuccessful("manager", SUPER_USER_PASSWD);
}

public void testFeatureResetNoManageRole() {
final ResetFeatureStateRequest req = new ResetFeatureStateRequest();

client().filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("usr", SUPER_USER_PASSWD)))
.admin()
.cluster()
.execute(ResetFeatureStateAction.INSTANCE, req, new ActionListener<>() {
@Override
public void onResponse(ResetFeatureStateResponse response) {
fail("Shouldn't reach here");
}

@Override
public void onFailure(Exception e) {
assertThat(
e.getMessage(),
containsString("action [cluster:admin/features/reset] is unauthorized for user [usr] with roles [role2]")
);
}
});

// Manually delete the security index, reset shouldn't work
deleteSecurityIndex();
}

private void assertResetSuccessful(String user, SecureString password) {
final ResetFeatureStateRequest req = new ResetFeatureStateRequest();

client().filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue(user, password)))
.admin()
.cluster()
.execute(ResetFeatureStateAction.INSTANCE, req, new ActionListener<>() {
@Override
public void onResponse(ResetFeatureStateResponse response) {
long failures = response.getFeatureStateResetStatuses()
.stream()
.filter(status -> status.getStatus() == ResetFeatureStateResponse.ResetFeatureStateStatus.Status.FAILURE)
.count();
assertEquals(0, failures);
}

@Override
public void onFailure(Exception e) {
fail("Shouldn't reach here");
}
});
}
}