Skip to content

Commit

Permalink
Concurrent Searching (Experimental)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Mar 23, 2022
1 parent db7f4b0 commit 491a78f
Show file tree
Hide file tree
Showing 38 changed files with 5,568 additions and 171 deletions.
42 changes: 42 additions & 0 deletions sandbox/plugins/concurrent-search/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.yaml-rest-test'

opensearchplugin {
name 'concurrent-search'
description 'The experimental plugin which implements concurrent search over Apache Lucene segments'
classname 'org.opensearch.search.ConcurrentSegmentSearchPlugin'
licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt')
noticeFile rootProject.file('NOTICE.txt')
}

yamlRestTest.enabled = false;
testingConventions.enabled = false;
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* The experimental plugin which implements the concurrent search over Apache Lucene segments.
*/
public class ConcurrentSegmentSearchPlugin extends Plugin implements SearchPlugin {
private static final String INDEX_SEARCHER = "index_searcher";

/**
* Default constructor
*/
public ConcurrentSegmentSearchPlugin() {}

@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
return Optional.of(new ConcurrentQueryPhaseSearcher());
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
return Collections.singletonList(
new FixedExecutorBuilder(settings, INDEX_SEARCHER, allocatedProcessors, 1000, "thread_pool." + INDEX_SEARCHER)
);
}

@Override
public Optional<ExecutorServiceProvider> getIndexSearcherExecutorProvider() {
return Optional.of((ThreadPool threadPool) -> threadPool.executor(INDEX_SEARCHER));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* The implementation of the experimental plugin which implements the concurrent search over Apache Lucene segments.
*/
package org.opensearch.search;
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;

import java.io.IOException;
import java.util.LinkedList;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Query;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.ProfileCollectorManager;
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase.TimeExceededException;

/**
* The implementation of the {@link QueryPhaseSearcher} which attempts to use concurrent
* search of Apache Lucene segments if it has been enabled.
*/
public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
private static final Logger LOGGER = LogManager.getLogger(ConcurrentQueryPhaseSearcher.class);

/**
* Default constructor
*/
public ConcurrentQueryPhaseSearcher() {}

@Override
protected boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
boolean couldUseConcurrentSegmentSearch = allowConcurrentSegmentSearch(searcher);

// TODO: support aggregations
if (searchContext.aggregations() != null) {
couldUseConcurrentSegmentSearch = false;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Unable to use concurrent search over index segments (experimental): aggregations are present");
}
}

if (couldUseConcurrentSegmentSearch) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Using concurrent search over index segments (experimental)");
}

return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
return super.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}

private static boolean searchWithCollectorManager(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectorContexts,
boolean hasFilterCollector,
boolean timeoutSet
) throws IOException {
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectorContexts.addFirst(topDocsFactory);

final QuerySearchResult queryResult = searchContext.queryResult();
final CollectorManager<?, ReduceableSearchResult> collectorManager;

// TODO: support aggregations in concurrent segment search flow
if (searchContext.aggregations() != null) {
throw new UnsupportedOperationException("The concurrent segment search does not support aggregations yet");
}

if (searchContext.getProfilers() != null) {
final ProfileCollectorManager<? extends Collector, ReduceableSearchResult> profileCollectorManager =
QueryCollectorManagerContext.createQueryCollectorManagerWithProfiler(collectorContexts);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollectorManager);
collectorManager = profileCollectorManager;
} else {
// Create multi collector manager instance
collectorManager = QueryCollectorManagerContext.createMultiCollectorManager(collectorContexts);
}

try {
final ReduceableSearchResult result = searcher.search(query, collectorManager);
result.reduce(queryResult);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}

return topDocsFactory.shouldRescore();
}

private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
return (searcher.getExecutor() != null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* {@link org.opensearch.search.query.QueryPhaseSearcher} implementation for concurrent search
*/
package org.opensearch.search.query;
Loading

0 comments on commit 491a78f

Please sign in to comment.