Skip to content

Commit

Permalink
Add tracing for authorization (#80815)
Browse files Browse the repository at this point in the history
* Can be turned off with xpack.security.authz.tracing: false
* AuthZ is tied to relevant task by traceparent (it however does not
  cover the first authZ)
* x-opaque-id is auto configured at rest layer if not already exists.
  This helps chain all relevant actions together.
* ApmIT now has security enabled.
  • Loading branch information
ywangd authored Nov 19, 2021
1 parent 76a9414 commit f3f9835
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 26 deletions.
9 changes: 8 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -753,7 +754,13 @@ protected Node(
final IndexingPressure indexingLimits = new IndexingPressure(settings);

final TaskTracer taskTracer = transportService.getTaskManager().getTaskTracer();
pluginComponents.stream().map(c -> c instanceof Tracer ? (Tracer) c : null).forEach(taskTracer::addTracer);
final List<Tracer> tracers = pluginComponents.stream()
.map(c -> c instanceof Tracer ? (Tracer) c : null)
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableList());
tracers.forEach(taskTracer::addTracer);

pluginsService.filterPlugins(Plugin.class).forEach(plugin -> plugin.onTracers(tracers));

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -204,4 +205,9 @@ public void close() throws IOException {
public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders() {
return Collections.emptyList();
}

/**
* Called with a list of Tracers so that each plugin can have a chance to work with them.
*/
public void onTracers(List<Tracer> tracers) {}
}
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/tracing/Tracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.tracing;

import java.util.Map;

/**
* Represents a distributed tracing system that keeps track of the start and end of various activities in the cluster.
*/
Expand All @@ -22,4 +24,9 @@ public interface Tracer {
* Called when the {@link Traceable} activity ends.
*/
void onTraceStopped(Traceable traceable);

/**
* Retrieve context related headers for the span of the given id.
*/
Map<String, String> getSpanHeadersById(String id);
}
3 changes: 3 additions & 0 deletions x-pack/plugin/apm-integration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ dependencies {

compileOnly project(path: xpackModule('core'))
internalClusterTestImplementation(testArtifact(project(xpackModule('core'))))
internalClusterTestImplementation(testArtifact(project(xpackModule('security')))) {
exclude group: 'com.google.guava', module: 'guava'
}
}

// no unit-test for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskTracer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.junit.After;

import java.util.Collection;
Expand All @@ -44,7 +48,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;

public class ApmIT extends ESIntegTestCase {
public class ApmIT extends SecurityIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand All @@ -53,15 +57,22 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
final MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString(APMTracer.APM_ENDPOINT_SETTING.getKey(), System.getProperty("tests.apm.endpoint", ""));
secureSettings.setString(APMTracer.APM_TOKEN_SETTING.getKey(), System.getProperty("tests.apm.token", ""));

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(APMTracer.APM_ENABLED_SETTING.getKey(), true)
.setSecureSettings(secureSettings)
.build();
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
((MockSecureSettings) builder.getSecureSettings()).setString(
APMTracer.APM_ENDPOINT_SETTING.getKey(),
System.getProperty("tests.apm.endpoint", "")
);
((MockSecureSettings) builder.getSecureSettings()).setString(
APMTracer.APM_TOKEN_SETTING.getKey(),
System.getProperty("tests.apm.token", "")
);
builder.put(APMTracer.APM_ENABLED_SETTING.getKey(), true).put("xpack.security.authz.tracing", true);
return builder.build();
}

@Override
protected boolean addMockHttpTransport() {
return false;
}

@After
Expand Down Expand Up @@ -166,12 +177,23 @@ public void testSearch() throws Exception {
final APMTracer.CapturingSpanExporter spanExporter = APMTracer.CAPTURING_SPAN_EXPORTER;
spanExporter.clear();

client().prepareSearch()
.setQuery(new RangeQueryBuilder("@timestamp").gt("2021-11-01"))
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreFilterShardSize(1)
.execute()
.actionGet(10, TimeUnit.SECONDS);
final Request searchRequest = new Request("GET", "_search");
searchRequest.addParameter("search_type", "query_then_fetch");
searchRequest.addParameter("pre_filter_shard_size", "1");
searchRequest.setJsonEntity("{\"query\":{\"range\":{\"@timestamp\":{\"gt\":\"2021-11-01\"}}}}");
searchRequest.setOptions(
searchRequest.getOptions()
.toBuilder()
.addHeader(
"Authorization",
UsernamePasswordToken.basicAuthHeaderValue(
SecuritySettingsSource.TEST_USER_NAME,
new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray())
)
)
);

final Response searchResponse = getRestClient().performRequest(searchRequest);

assertTrue(spanExporter.findSpanByName(SearchAction.NAME).findAny().isPresent());
assertTrue(spanExporter.findSpanByName(SearchTransportService.QUERY_CAN_MATCH_NODE_NAME).findAny().isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public void onTraceStarted(Traceable traceable) {
);
}
}
final String xOpaqueId = threadPool.getThreadContext().getHeader(Task.X_OPAQUE_ID);
if (xOpaqueId != null) {
spanBuilder.setAttribute("es.x-opaque-id", xOpaqueId);
}
return spanBuilder.startSpan();
});
}
Expand All @@ -239,6 +243,7 @@ private Context getParentSpanContext(OpenTelemetry openTelemetry) {
return null;
}

@Override
public Map<String, String> getSpanHeadersById(String id) {
var services = this.services;
var span = spans.get(id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.security;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tracing.Traceable;
import org.elasticsearch.tracing.Tracer;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class AuthorizationTracer {

private static final Logger logger = LogManager.getLogger(AuthorizationTracer.class);

private final ThreadContext threadContext;
private final List<Tracer> tracers = new CopyOnWriteArrayList<>();

public AuthorizationTracer(ThreadContext threadContext) {
this.threadContext = threadContext;
}

public void addTracer(Tracer tracer) {
if (tracer != null) {
tracers.add(tracer);
}
}

public Runnable startTracing(Traceable traceable) {
for (Tracer tracer : tracers) {
try {
tracer.onTraceStarted(traceable);
} catch (Exception e) {
assert false : e;
logger.warn(
new ParameterizedMessage(
"authorization tracing listener [{}] failed on starting tracing of [{}][{}]",
tracer,
traceable.getSpanId(),
traceable.getSpanName()
),
e
);
}
}
return () -> {
for (Tracer tracer : tracers) {
try {
tracer.onTraceStopped(traceable);
} catch (Exception e) {
assert false : e;
logger.warn(
new ParameterizedMessage(
"authorization tracing listener [{}] failed on stopping tracing of [{}][{}]",
tracer,
traceable.getSpanId(),
traceable.getSpanName()
),
e
);
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -470,6 +471,7 @@ public class Security extends Plugin
private final List<SecurityExtension> securityExtensions = new ArrayList<>();
private final SetOnce<Transport> transportReference = new SetOnce<>();
private final SetOnce<ScriptService> scriptServiceReference = new SetOnce<>();
private final SetOnce<AuthorizationTracer> authorizationTracerReference = new SetOnce<>();

public Security(Settings settings, final Path configPath) {
this(settings, configPath, Collections.emptyList());
Expand Down Expand Up @@ -810,6 +812,7 @@ Collection<Object> createComponents(
}
requestInterceptors = Collections.unmodifiableSet(requestInterceptors);

authorizationTracerReference.set(new AuthorizationTracer(threadContext.get()));
final AuthorizationService authzService = new AuthorizationService(
settings,
allRolesStore,
Expand All @@ -822,7 +825,8 @@ Collection<Object> createComponents(
requestInterceptors,
getLicenseState(),
expressionResolver,
operatorPrivilegesService
operatorPrivilegesService,
authorizationTracerReference.get()
);

components.add(nativeRolesStore); // used by roles actions
Expand Down Expand Up @@ -1602,6 +1606,15 @@ public void loadExtensions(ExtensionLoader loader) {
securityExtensions.addAll(loader.loadExtensions(SecurityExtension.class));
}

@Override
public void onTracers(List<Tracer> tracers) {
if (authorizationTracerReference.get() == null) {
// security is disabled
return;
}
tracers.forEach(t -> authorizationTracerReference.get().addTracer(t));
}

private synchronized NioGroupFactory getNioGroupFactory(Settings settings) {
if (nioGroupFactory.get() != null) {
assert nioGroupFactory.get().getSettings().equals(settings) : "Different settings than originally provided";
Expand Down
Loading

0 comments on commit f3f9835

Please sign in to comment.