Skip to content

Commit

Permalink
Fix downsample failure when FLS/DLS is enabled.
Browse files Browse the repository at this point in the history
If FLS/DLS is enabled (this is the case when trial/licence is active and security is enabled) then invoking the downsample API results in immediate failure.

The downsample shard persistent task executor opens a searcher, but security didn't set indices permissions in the thread local (this happens via SecurityActionFilter). This will only happen on indices actions (which are actions with a request that implement IndicesRequest. This change does this by delegating to a transport action that executes always locally, and this way security prepares thread local headers correctly.

This adds another layer of indirection, but without doing this FLS/DLS wouldn't work.

Closes elastic#98569
  • Loading branch information
martijnvg committed Aug 17, 2023
1 parent 0754bd8 commit ec5cc8d
Show file tree
Hide file tree
Showing 7 changed files with 614 additions and 153 deletions.
34 changes: 34 additions & 0 deletions x-pack/plugin/downsample/qa/with-security/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.info.BuildParams

apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test'

dependencies {
yamlRestTestImplementation project(path: xpackModule('rollup'))
}

restResources {
restApi {
include '_common', 'bulk', 'cluster', 'indices', 'search', 'ingest.put_pipeline', 'ingest.delete_pipeline'
}
}

testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'elastic_admin', password: 'admin-password'
}

if (BuildParams.inFipsJvm){
// This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC
tasks.named("yamlRestTest").configure{enabled = false }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;

public class DownsampleRestIT extends ESClientYamlSuiteTestCase {

public DownsampleRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@Override
protected Settings restClientSettings() {
String authentication = basicAuthHeaderValue("elastic_admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", authentication).build();
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -22,31 +21,23 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.downsample.DownsampleShardTask;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -55,8 +46,6 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPl
public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;

private IndicesService indicesService;

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final FixedExecutorBuilder downsample = new FixedExecutorBuilder(
Expand All @@ -74,7 +63,11 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class),
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class)
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class),
new ActionHandler<>(
DownsampleShardPersistentTaskExecutor.DelegatingAction.INSTANCE,
DownsampleShardPersistentTaskExecutor.DelegatingAction.TA.class
)
);
}

Expand All @@ -99,14 +92,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(
new DownsampleShardPersistentTaskExecutor(
client,
this.indicesService,
DownsampleShardTask.TASK_NAME,
DOWSAMPLE_TASK_THREAD_POOL_NAME
)
);
return List.of(new DownsampleShardPersistentTaskExecutor(client, DownsampleShardTask.TASK_NAME, DOWSAMPLE_TASK_THREAD_POOL_NAME));
}

@Override
Expand Down Expand Up @@ -136,46 +122,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(
PersistentTaskParams.class,
DownsampleShardTaskParams.NAME,
DownsampleShardTaskParams::readFromStream
DownsampleShardTaskParams::new
)
);
}

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final AllocationService allocationService,
final IndicesService indicesService
) {
final Collection<Object> components = super.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
indexNameExpressionResolver,
repositoriesServiceSupplier,
tracer,
allocationService,
indicesService
);

this.indicesService = indicesService;
return components;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept

if (task.getNumIndexed() != task.getNumSent()) {
task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED);
task.updatePersistentTaskState(
new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null),
ActionListener.noop()
);
final String error = "Downsampling task ["
+ task.getPersistentTaskId()
+ "] on shard "
Expand All @@ -199,10 +195,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumFailed()
+ "]";
logger.info(error);
task.updatePersistentTaskState(
new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null),
ActionListener.noop()
);
throw new DownsampleShardIndexerException(error, false);
}

Expand Down
Loading

0 comments on commit ec5cc8d

Please sign in to comment.