Skip to content

Commit

Permalink
include SearchRequest in SearchContext and rename to SearchRequestOpe…
Browse files Browse the repository at this point in the history
…rationsCompositeListenerFactory
  • Loading branch information
ansjcy committed Jan 10, 2024
1 parent 0e731b8 commit 79caa4f
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 94 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562))
- Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678))
- Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582))
- Added Support for dynamically adding SearchRequestOperationsListeners ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

Expand All @@ -31,25 +29,14 @@ class SearchRequestContext {
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

private final boolean phaseTookEnabled;
private final SearchRequest searchRequest;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), false);
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, boolean phaseTookEnabled) {
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.phaseTookEnabled = phaseTookEnabled;
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this(searchRequestOperationsListener, false);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -65,7 +52,7 @@ Map<String, Long> phaseTookMap() {
}

SearchResponse.PhaseTook getPhaseTook() {
if (phaseTookEnabled) {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);

Check warning on line 56 in server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java#L56

Added line #L56 was not covered by tests
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
import java.util.stream.Stream;

/**
* SearchRequestOperationsListeners contains listeners registered to search requests,
* SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public class SearchRequestOperationsListeners {
public final class SearchRequestOperationsCompositeListenerFactory {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestOperationsListeners and add multiple {@link SearchRequestOperationsListener}
* Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestOperationsListeners(SearchRequestOperationsListener... listeners) {
public SearchRequestOperationsCompositeListenerFactory(SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsListeners searchRequestOperationsListeners;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -195,7 +195,7 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestOperationsListeners searchRequestOperationsListeners
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -212,7 +212,7 @@ public TransportSearchAction(
this.searchPipelineService = searchPipelineService;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchRequestOperationsListeners = searchRequestOperationsListeners;
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}
Expand Down Expand Up @@ -428,15 +428,12 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
final boolean phaseTookEnabled;
if (originalSearchRequest.isPhaseTook() == null) {
phaseTookEnabled = clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
} else {
phaseTookEnabled = originalSearchRequest.isPhaseTook();
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, phaseTookEnabled);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
Expand Down
23 changes: 12 additions & 11 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestOperationsListeners;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -881,15 +881,16 @@ protected Node(
)
.collect(Collectors.toList());

// register all standard SearchRequestOperationsListeners to the SearchRequestOperationsListeners
final SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);

ActionModule actionModule = new ActionModule(
settings,
Expand Down Expand Up @@ -1287,7 +1288,7 @@ protected Node(
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestOperationsListeners.class).toInstance(searchRequestOperationsListeners);
b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
});
injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -177,7 +178,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
results,
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
Expand Down Expand Up @@ -715,7 +716,10 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
null,
task,
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger))
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
);
}

Expand Down Expand Up @@ -765,7 +769,10 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
null,
task,
SearchResponse.Clusters.EMPTY,
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger))
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger),
searchRequest
)
) {
@Override
ShardSearchFailure[] buildShardFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.action.OriginalIndices;
Expand Down Expand Up @@ -137,7 +138,10 @@ public void run() throws IOException {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -229,7 +233,10 @@ public void run() throws IOException {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -320,7 +327,10 @@ public void sendCanMatch(
new ArraySearchPhaseResults<>(iter.size()),
randomIntBetween(1, 32),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
) {

@Override
Expand Down Expand Up @@ -348,7 +358,10 @@ protected void executePhaseOnShard(
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -433,7 +446,10 @@ public void run() {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down Expand Up @@ -533,7 +549,10 @@ public void run() {
}
},
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
);

canMatchPhase.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.opensearch.Version;
import org.opensearch.action.OriginalIndices;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -136,7 +138,7 @@ public void testSkipSearchShards() throws InterruptedException {
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {

@Override
Expand Down Expand Up @@ -255,7 +257,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {

@Override
Expand Down Expand Up @@ -373,7 +375,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {
TestSearchResponse response = new TestSearchResponse();

Expand Down Expand Up @@ -496,7 +498,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {
TestSearchResponse response = new TestSearchResponse();

Expand Down Expand Up @@ -610,7 +612,7 @@ public void testAllowPartialResults() throws InterruptedException {
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request)
) {
@Override
protected void executePhaseOnShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.opensearch.transport.Transport;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -215,7 +217,10 @@ public void sendExecuteQuery(
null,
task,
SearchResponse.Clusters.EMPTY,
new SearchRequestContext()
new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()),
searchRequest
)
) {
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
Expand Down
Loading

0 comments on commit 79caa4f

Please sign in to comment.