Skip to content

Commit

Permalink
Move SlowLogFieldProvider instantiation to node construction (elastic…
Browse files Browse the repository at this point in the history
…#117949)

SPI from plugins should be created at node startup. This commit moves
creation of SlowLogFieldProvider into node construction and passes it in
to IndicesService so that it is not recreated on each index creation.

relates elastic#102103
  • Loading branch information
rjernst committed Jan 6, 2025
1 parent 8bf218b commit 3038cb5
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 164 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117949.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117949
summary: Move `SlowLogFieldProvider` instantiation to node construction
area: Infra/Logging
type: bug
issues: []
5 changes: 3 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ public IndexModule(
this.engineFactory = Objects.requireNonNull(engineFactory);
// Need to have a mutable arraylist for plugins to add listeners to it
this.searchOperationListeners = new ArrayList<>(searchOperationListeners);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings, slowLogFieldProvider));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings, slowLogFieldProvider));
SlowLogFields slowLogFields = slowLogFieldProvider.create(indexSettings);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings, slowLogFields));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings, slowLogFields));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
Expand Down
42 changes: 7 additions & 35 deletions server/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
* <em>characters</em> of the source.
*/
private int maxSourceCharsToLog;
private final SlowLogFieldProvider slowLogFieldProvider;
private final SlowLogFields slowLogFields;

/**
* Reads how much of the source to log. The user can specify any value they
Expand All @@ -125,8 +125,8 @@ public final class IndexingSlowLog implements IndexingOperationListener {
Property.IndexScope
);

IndexingSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFieldProvider) {
this.slowLogFieldProvider = slowLogFieldProvider;
IndexingSlowLog(IndexSettings indexSettings, SlowLogFields slowLogFields) {
this.slowLogFields = slowLogFields;
this.index = indexSettings.getIndex();

indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING, this::setReformat);
Expand Down Expand Up @@ -179,47 +179,19 @@ public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.Index
final long tookInNanos = result.getTook();
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
index,
doc,
tookInNanos,
reformat,
maxSourceCharsToLog
)
IndexingSlowLogMessage.of(this.slowLogFields.indexFields(), index, doc, tookInNanos, reformat, maxSourceCharsToLog)
);
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
indexLogger.info(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
index,
doc,
tookInNanos,
reformat,
maxSourceCharsToLog
)
IndexingSlowLogMessage.of(this.slowLogFields.indexFields(), index, doc, tookInNanos, reformat, maxSourceCharsToLog)
);
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
indexLogger.debug(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
index,
doc,
tookInNanos,
reformat,
maxSourceCharsToLog
)
IndexingSlowLogMessage.of(this.slowLogFields.indexFields(), index, doc, tookInNanos, reformat, maxSourceCharsToLog)
);
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
indexLogger.trace(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
index,
doc,
tookInNanos,
reformat,
maxSourceCharsToLog
)
IndexingSlowLogMessage.of(this.slowLogFields.indexFields(), index, doc, tookInNanos, reformat, maxSourceCharsToLog)
);
}
}
Expand Down
23 changes: 11 additions & 12 deletions server/src/main/java/org/elasticsearch/index/SearchSlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class SearchSlowLog implements SearchOperationListener {
private static final Logger queryLogger = LogManager.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".query");
private static final Logger fetchLogger = LogManager.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".fetch");

private final SlowLogFieldProvider slowLogFieldProvider;
private final SlowLogFields slowLogFields;

public static final Setting<Boolean> INDEX_SEARCH_SLOWLOG_INCLUDE_USER_SETTING = Setting.boolSetting(
INDEX_SEARCH_SLOWLOG_PREFIX + ".include.user",
Expand Down Expand Up @@ -126,9 +126,8 @@ public final class SearchSlowLog implements SearchOperationListener {

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public SearchSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFieldProvider) {
slowLogFieldProvider.init(indexSettings);
this.slowLogFieldProvider = slowLogFieldProvider;
public SearchSlowLog(IndexSettings indexSettings, SlowLogFields slowLogFields) {
this.slowLogFields = slowLogFields;
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING, this::setQueryWarnThreshold);
this.queryWarnThreshold = indexSettings.getValue(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING).nanos();
Expand Down Expand Up @@ -159,26 +158,26 @@ public SearchSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFi
@Override
public void onQueryPhase(SearchContext context, long tookInNanos) {
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
queryLogger.warn(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.warn(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryInfoThreshold >= 0 && tookInNanos > queryInfoThreshold) {
queryLogger.info(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.info(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryDebugThreshold >= 0 && tookInNanos > queryDebugThreshold) {
queryLogger.debug(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.debug(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryTraceThreshold >= 0 && tookInNanos > queryTraceThreshold) {
queryLogger.trace(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.trace(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
}
}

@Override
public void onFetchPhase(SearchContext context, long tookInNanos) {
if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) {
fetchLogger.warn(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.warn(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchInfoThreshold >= 0 && tookInNanos > fetchInfoThreshold) {
fetchLogger.info(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.info(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchDebugThreshold >= 0 && tookInNanos > fetchDebugThreshold) {
fetchLogger.debug(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.debug(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchTraceThreshold >= 0 && tookInNanos > fetchTraceThreshold) {
fetchLogger.trace(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.trace(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,14 @@

package org.elasticsearch.index;

import java.util.Map;

/**
* Interface for providing additional fields to the slow log from a plugin.
* Intended to be loaded through SPI.
*/
public interface SlowLogFieldProvider {
/**
* Initialize field provider with index level settings to be able to listen for updates and set initial values
* Create a field provider with index level settings to be able to listen for updates and set initial values
* @param indexSettings settings for the index
*/
void init(IndexSettings indexSettings);

/**
* Slow log fields for indexing events
* @return map of field name to value
*/
Map<String, String> indexSlowLogFields();

/**
* Slow log fields for search events
* @return map of field name to value
*/
Map<String, String> searchSlowLogFields();
SlowLogFields create(IndexSettings indexSettings);
}
30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/index/SlowLogFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index;

import java.util.Map;

/**
* Fields for the slow log. These may be different each call depending on the state of the system.
*/
public interface SlowLogFields {

/**
* Slow log fields for indexing events
* @return map of field name to value
*/
Map<String, String> indexFields();

/**
* Slow log fields for search events
* @return map of field name to value
*/
Map<String, String> searchFields();
}
31 changes: 4 additions & 27 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final MapperMetrics mapperMetrics;
private final List<SearchOperationListener> searchOperationListeners;
private final QueryRewriteInterceptor queryRewriteInterceptor;
final SlowLogFieldProvider slowLogFieldProvider; // pkg-private for testingå

@Override
protected void doStart() {
Expand Down Expand Up @@ -385,6 +386,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
this.searchOperationListeners = builder.searchOperationListener;
this.slowLogFieldProvider = builder.slowLogFieldProvider;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand Down Expand Up @@ -755,7 +757,7 @@ private synchronized IndexService createIndexService(
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
loadSlowLogFieldProvider(),
slowLogFieldProvider,
mapperMetrics,
searchOperationListeners
);
Expand Down Expand Up @@ -835,7 +837,7 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet
() -> allowExpensiveQueries,
indexNameExpressionResolver,
recoveryStateFactories,
loadSlowLogFieldProvider(),
slowLogFieldProvider,
mapperMetrics,
searchOperationListeners
);
Expand Down Expand Up @@ -1437,31 +1439,6 @@ int numPendingDeletes(Index index) {
}
}

// pkg-private for testing
SlowLogFieldProvider loadSlowLogFieldProvider() {
List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
return new SlowLogFieldProvider() {
@Override
public void init(IndexSettings indexSettings) {
slowLogFieldProviders.forEach(provider -> provider.init(indexSettings));
}

@Override
public Map<String, String> indexSlowLogFields() {
return slowLogFieldProviders.stream()
.flatMap(provider -> provider.indexSlowLogFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<String, String> searchSlowLogFields() {
return slowLogFieldProviders.stream()
.flatMap(provider -> provider.searchSlowLogFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
}

/**
* Checks if all pending deletes have completed. Used by tests to ensure we don't check directory contents
* while deletion still ongoing. * The reason is that, on Windows, browsing the directory contents can interfere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -79,6 +81,22 @@ public class IndicesServiceBuilder {
MapperMetrics mapperMetrics;
List<SearchOperationListener> searchOperationListener = List.of();
QueryRewriteInterceptor queryRewriteInterceptor = null;
SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() {
@Override
public SlowLogFields create(IndexSettings indexSettings) {
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return Map.of();
}

@Override
public Map<String, String> searchFields() {
return Map.of();
}
};
}
};

public IndicesServiceBuilder settings(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -191,6 +209,11 @@ public IndicesServiceBuilder searchOperationListeners(List<SearchOperationListen
return this;
}

public IndicesServiceBuilder slowLogFieldProvider(SlowLogFieldProvider slowLogFieldProvider) {
this.slowLogFieldProvider = slowLogFieldProvider;
return this;
}

public IndicesService build() {
Objects.requireNonNull(settings);
Objects.requireNonNull(pluginsService);
Expand All @@ -216,6 +239,7 @@ public IndicesService build() {
Objects.requireNonNull(snapshotCommitSuppliers);
Objects.requireNonNull(mapperMetrics);
Objects.requireNonNull(searchOperationListener);
Objects.requireNonNull(slowLogFieldProvider);

// collect engine factory providers from plugins
engineFactoryProviders = pluginsService.filterPlugins(EnginePlugin.class)
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
Expand Down Expand Up @@ -808,6 +810,31 @@ private void construct(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
);

List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
// because the responses may change dynamically at runtime
SlowLogFieldProvider slowLogFieldProvider = indexSettings -> {
final List<SlowLogFields> fields = new ArrayList<>();
for (var provider : slowLogFieldProviders) {
fields.add(provider.create(indexSettings));
}
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields.stream()
.flatMap(f -> f.indexFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<String, String> searchFields() {
return fields.stream()
.flatMap(f -> f.searchFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
};

IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
.pluginsService(pluginsService)
.nodeEnvironment(nodeEnvironment)
Expand All @@ -829,6 +856,7 @@ private void construct(
.requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
.mapperMetrics(mapperMetrics)
.searchOperationListeners(searchOperationListeners)
.slowLogFieldProvider(slowLogFieldProvider)
.build();

final var parameters = new IndexSettingProvider.Parameters(clusterService, indicesService::createIndexMapperServiceForValidation);
Expand Down
Loading

0 comments on commit 3038cb5

Please sign in to comment.