Skip to content

Commit

Permalink
Enable index-time sorting (#24055)
Browse files Browse the repository at this point in the history
This change adds an index setting to define how the documents should be sorted inside each Segment.
It allows any numeric, date, boolean or keyword field inside a mapping to be used to sort the index on disk.
It is not allowed to use a `nested` fields inside an index that defines an index sorting since `nested` fields relies on the original sort of the index.
This change does not add early termination capabilities in the search layer. This will be added in a follow up.

Relates #6720
  • Loading branch information
jimczi authored Apr 19, 2017
1 parent c0ac50e commit f05af0a
Show file tree
Hide file tree
Showing 28 changed files with 1,313 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.elasticsearch.action.admin.indices.segments;

import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
Expand All @@ -37,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Locale;

public class IndicesSegmentResponse extends BroadcastResponse implements ToXContent {

Expand Down Expand Up @@ -140,6 +145,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
if (segment.getSegmentSort() != null) {
toXContent(builder, segment.getSegmentSort());
}
if (segment.ramTree != null) {
builder.startArray(Fields.RAM_TREE);
for (Accountable child : segment.ramTree.getChildResources()) {
Expand All @@ -164,6 +172,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

static void toXContent(XContentBuilder builder, Sort sort) throws IOException {
builder.startArray("sort");
for (SortField field : sort.getSort()) {
builder.startObject();
builder.field("field", field.getField());
if (field instanceof SortedNumericSortField) {
builder.field("mode", ((SortedNumericSortField) field).getSelector()
.toString().toLowerCase(Locale.ROOT));
} else if (field instanceof SortedSetSortField) {
builder.field("mode", ((SortedSetSortField) field).getSelector()
.toString().toLowerCase(Locale.ROOT));
}
builder.field("missing", field.getMissingValue());
builder.field("reverse", field.getReverse());
builder.endObject();
}
builder.endArray();
}

static void toXContent(XContentBuilder builder, Accountable tree) throws IOException {
builder.startObject();
builder.field(Fields.DESCRIPTION, tree.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public ActionRequestValidationException validate() {
if (shrinkIndexRequest == null) {
validationException = addValidationError("shrink index request is missing", validationException);
}
if (shrinkIndexRequest.settings().getByPrefix("index.sort.").isEmpty() == false) {
validationException = addValidationError("can't override index sort when shrinking index", validationException);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,18 @@ public ClusterState execute(ClusterState currentState) throws Exception {
throw e;
}

if (request.shrinkFrom() == null) {
// now that the mapping is merged we can validate the index sort.
// we cannot validate for index shrinking since the mapping is empty
// at this point. The validation will take place later in the process
// (when all shards are copied in a single place).
indexService.getIndexSortSupplier().get();
}

// the context is only used for validation so it's fine to pass fake values for the shard id and the current
// timestamp
final QueryShardContext queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L);

for (Alias alias : request.aliases()) {
if (Strings.hasLength(alias.filter())) {
aliasValidator.validateAliasFilter(alias.name(), alias.filter(), queryShardContext, xContentRegistry);
Expand Down Expand Up @@ -581,22 +590,23 @@ static List<String> validateShrinkIndex(ClusterState state, String sourceIndex,

static void prepareShrinkIndexSettings(ClusterState currentState, Set<String> mappingKeys, Settings.Builder indexSettingsBuilder, Index shrinkFromIndex, String shrinkIntoName) {
final IndexMetaData sourceMetaData = currentState.metaData().index(shrinkFromIndex.getName());

final List<String> nodesToAllocateOn = validateShrinkIndex(currentState, shrinkFromIndex.getName(),
mappingKeys, shrinkIntoName, indexSettingsBuilder.build());
final Predicate<String> analysisSimilarityPredicate = (s) -> s.startsWith("index.similarity.")
|| s.startsWith("index.analysis.");
final Predicate<String> sourceSettingsPredicate = (s) -> s.startsWith("index.similarity.")
|| s.startsWith("index.analysis.") || s.startsWith("index.sort.");
indexSettingsBuilder
// we use "i.r.a.initial_recovery" rather than "i.r.a.require|include" since we want the replica to allocate right away
// once we are allocated.
.put("index.routing.allocation.initial_recovery._id",
Strings.arrayToCommaDelimitedString(nodesToAllocateOn.toArray()))
// we only try once and then give up with a shrink index
.put("index.allocation.max_retries", 1)
// now copy all similarity / analysis settings - this overrides all settings from the user unless they
// now copy all similarity / analysis / sort settings - this overrides all settings from the user unless they
// wanna add extra settings
.put(IndexMetaData.SETTING_VERSION_CREATED, sourceMetaData.getCreationVersion())
.put(IndexMetaData.SETTING_VERSION_UPGRADED, sourceMetaData.getUpgradedVersion())
.put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate))
.put(sourceMetaData.getSettings().filter(sourceSettingsPredicate))
.put(IndexMetaData.SETTING_ROUTING_PARTITION_SIZE, sourceMetaData.getRoutingPartitionSize())
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName())
.put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.elasticsearch.common.settings;

import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
Expand Down Expand Up @@ -100,6 +100,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
IndexSortConfig.INDEX_SORT_FIELD_SETTING,
IndexSortConfig.INDEX_SORT_ORDER_SETTING,
IndexSortConfig.INDEX_SORT_MISSING_SETTING,
IndexSortConfig.INDEX_SORT_MODE_SETTING,
IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING,
IndexSettings.INDEX_WARMER_ENABLED_SETTING,
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING,
Expand Down
29 changes: 23 additions & 6 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.elasticsearch.index;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
Expand Down Expand Up @@ -84,6 +84,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ScriptService scriptService;
private final ClusterService clusterService;
private final Client client;
private Supplier<Sort> indexSortSupplier;

public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
Expand Down Expand Up @@ -153,6 +155,16 @@ public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
throw new IllegalArgumentException("Percolator queries are not allowed to use the current timestamp");
}));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
);
} else {
this.indexSortSupplier = () -> null;
}
this.shardStoreDeleter = shardStoreDeleter;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
Expand Down Expand Up @@ -243,6 +255,10 @@ public SimilarityService similarityService() {
return similarityService;
}

public Supplier<Sort> getIndexSortSupplier() {
return indexSortSupplier;
}

public synchronized void close(final String reason, boolean delete) throws IOException {
if (closed.compareAndSet(false, true)) {
deleted.compareAndSet(false, delete);
Expand Down Expand Up @@ -350,10 +366,10 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
};
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
() -> globalCheckpointSyncer.accept(shardId),
searchOperationListeners, indexingOperationListeners);
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, indexFieldData, engineFactory,
eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
() -> globalCheckpointSyncer.accept(shardId), searchOperationListeners, indexingOperationListeners);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -401,7 +417,8 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
final boolean flushEngine = deleted.get() == false && closed.get();
indexShard.close(reason, flushEngine);
} catch (Exception e) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e);
logger.debug((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e);
// ignore
}
}
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ public final class IndexSettings {
Setting.intSetting("index.max_rescore_window", MAX_RESULT_WINDOW_SETTING, 1, Property.Dynamic, Property.IndexScope);
/**
* Index setting describing the maximum number of filters clauses that can be used
* in an adjacency_matrix aggregation. The max number of buckets produced by
* in an adjacency_matrix aggregation. The max number of buckets produced by
* N filters is (N*N)/2 so a limit of 100 filters is imposed by default.
*/
public static final Setting<Integer> MAX_ADJACENCY_MATRIX_FILTERS_SETTING =
Setting.intSetting("index.max_adjacency_matrix_filters", 100, 2, Property.Dynamic, Property.IndexScope);
Setting.intSetting("index.max_adjacency_matrix_filters", 100, 2, Property.Dynamic, Property.IndexScope);
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final Setting<TimeValue> INDEX_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.refresh_interval", DEFAULT_REFRESH_INTERVAL, new TimeValue(-1, TimeUnit.MILLISECONDS),
Expand Down Expand Up @@ -176,6 +176,7 @@ public final class IndexSettings {
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndexSortConfig indexSortConfig;
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean warmerEnabled;
Expand Down Expand Up @@ -278,6 +279,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
maxRefreshListeners = scopedSettings.get(MAX_REFRESH_LISTENERS_PER_SHARD);
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -499,7 +501,7 @@ public int getMaxResultWindow() {
private void setMaxResultWindow(int maxResultWindow) {
this.maxResultWindow = maxResultWindow;
}

/**
* Returns the max number of filters in adjacency_matrix aggregation search requests
*/
Expand All @@ -509,7 +511,7 @@ public int getMaxAdjacencyMatrixFilters() {

private void setMaxAdjacencyMatrixFilters(int maxAdjacencyFilters) {
this.maxAdjacencyMatrixFilters = maxAdjacencyFilters;
}
}

/**
* Returns the maximum rescore window for search requests.
Expand Down Expand Up @@ -574,5 +576,12 @@ private void setMaxSlicesPerScroll(int value) {
this.maxSlicesPerScroll = value;
}

/**
* Returns the index sort config that should be used for this index.
*/
public IndexSortConfig getIndexSortConfig() {
return indexSortConfig;
}

public IndexScopedSettings getScopedSettings() { return scopedSettings;}
}
Loading

0 comments on commit f05af0a

Please sign in to comment.