Skip to content

Commit

Permalink
Record timestamp field range in index metadata
Browse files Browse the repository at this point in the history
Queries including a filter by timestamp range are common in time-series
data. Moreover older time-series indices are typically made read-only so
that the timestamp range becomes immutable. By recording in the index
metadata the range of timestamps covered by each index we can very
efficiently skip shards on the coordinating node, even if those shards
are not assigned.

This commit computes the timestamp range of immutable indices and
records it in the index metadata as the shards start for the first time.
Note that the only indices it considers immutable today are ones using
the `ReadOnlyEngine`, which includes frozen indices and searchable
snapshots but not regular indices with a write block.

Backport of #65564 and #65720
  • Loading branch information
DaveCTurner committed Dec 2, 2020
1 parent 981dd5d commit 2c8957c
Show file tree
Hide file tree
Showing 35 changed files with 1,683 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.action.shard;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -36,7 +37,9 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand All @@ -50,6 +53,9 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand All @@ -67,9 +73,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -519,16 +527,23 @@ public int hashCode() {
public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange,
final ActionListener<Void> listener) {
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
shardStarted(shardRouting, primaryTerm, message, timestampMillisRange, listener, clusterService.state());
}

public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange,
final ActionListener<Void> listener,
final ClusterState currentState) {
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
final StartedShardEntry entry = new StartedShardEntry(
shardRouting.shardId(),
shardRouting.allocationId().getId(),
primaryTerm,
message,
timestampMillisRange);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

Expand Down Expand Up @@ -578,6 +593,7 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
List<StartedShardEntry> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
final Map<Index, IndexLongFieldRange> updatedTimestampRanges = new HashMap<>();
for (StartedShardEntry task : tasks) {
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
Expand Down Expand Up @@ -618,6 +634,22 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(matched);
seenShardRoutings.add(matched);

// expand the timestamp range recorded in the index metadata if needed
final Index index = task.shardId.getIndex();
IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index);
final IndexMetadata indexMetadata = currentState.metadata().index(index);
if (currentTimestampMillisRange == null) {
currentTimestampMillisRange = indexMetadata.getTimestampMillisRange();
}
final IndexLongFieldRange newTimestampMillisRange;
newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange(
task.shardId.id(),
indexMetadata.getNumberOfShards(),
task.timestampMillisRange);
if (newTimestampMillisRange != currentTimestampMillisRange) {
updatedTimestampRanges.put(index, newTimestampMillisRange);
}
}
}
}
Expand All @@ -627,6 +659,19 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
ClusterState maybeUpdatedState = currentState;
try {
maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied);

if (updatedTimestampRanges.isEmpty() == false) {
final Metadata.Builder metadataBuilder = Metadata.builder(maybeUpdatedState.metadata());
for (Map.Entry<Index, IndexLongFieldRange> updatedTimestampRangeEntry : updatedTimestampRanges.entrySet()) {
metadataBuilder.put(IndexMetadata
.builder(metadataBuilder.getSafe(updatedTimestampRangeEntry.getKey()))
.timestampMillisRange(updatedTimestampRangeEntry.getValue()));
}
maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
}

assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);

builder.successes(tasksToBeApplied);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);
Expand All @@ -636,6 +681,16 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
return builder.build(maybeUpdatedState);
}

private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
for (ObjectObjectCursor<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting()) {
assert cursor.value.allPrimaryShardsActive() == false
|| clusterState.metadata().index(cursor.key).getTimestampMillisRange().isComplete()
: "index [" + cursor.key + "] should have complete timestamp range, but got "
+ clusterState.metadata().index(cursor.key).getTimestampMillisRange() + " for " + cursor.value.prettyPrint();
}
return true;
}

@Override
public void onFailure(String source, Exception e) {
if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) {
Expand All @@ -658,6 +713,7 @@ public static class StartedShardEntry extends TransportRequest {
final String allocationId;
final long primaryTerm;
final String message;
final ShardLongFieldRange timestampMillisRange;

StartedShardEntry(StreamInput in) throws IOException {
super(in);
Expand All @@ -676,13 +732,19 @@ public static class StartedShardEntry extends TransportRequest {
final Exception ex = in.readException();
assert ex == null : "started shard must not have failure [" + ex + "]";
}
this.timestampMillisRange = ShardLongFieldRange.readFrom(in);
}

public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
public StartedShardEntry(final ShardId shardId,
final String allocationId,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.timestampMillisRange = timestampMillisRange;
}

@Override
Expand All @@ -699,6 +761,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_3_0)) {
out.writeException(null);
}
timestampMillisRange.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -340,6 +341,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
static final String KEY_ALIASES = "aliases";
static final String KEY_ROLLOVER_INFOS = "rollover_info";
static final String KEY_SYSTEM = "system";
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
public static final String KEY_PRIMARY_TERMS = "primary_terms";

public static final String INDEX_STATE_FILE_PREFIX = "state-";
Expand Down Expand Up @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;

private final IndexLongFieldRange timestampMillisRange;

private IndexMetadata(
final Index index,
final long version,
Expand All @@ -415,7 +419,8 @@ private IndexMetadata(
final int routingPartitionSize,
final ActiveShardCount waitForActiveShards,
final ImmutableOpenMap<String, RolloverInfo> rolloverInfos,
final boolean isSystem) {
final boolean isSystem,
final IndexLongFieldRange timestampMillisRange) {

this.index = index;
this.version = version;
Expand Down Expand Up @@ -448,6 +453,7 @@ private IndexMetadata(
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = rolloverInfos;
this.isSystem = isSystem;
this.timestampMillisRange = timestampMillisRange;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -661,6 +667,10 @@ public DiscoveryNodeFilters excludeFilters() {
return excludeFilters;
}

public IndexLongFieldRange getTimestampMillisRange() {
return timestampMillisRange;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -770,6 +780,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final Diff<ImmutableOpenIntMap<Set<String>>> inSyncAllocationIds;
private final Diff<ImmutableOpenMap<String, RolloverInfo>> rolloverInfos;
private final boolean isSystem;
private final IndexLongFieldRange timestampMillisRange;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand All @@ -788,6 +799,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer());
isSystem = after.isSystem;
timestampMillisRange = after.timestampMillisRange;
}

private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
Expand Down Expand Up @@ -833,6 +845,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
} else {
isSystem = false;
}
timestampMillisRange = IndexLongFieldRange.readFrom(in);
}

@Override
Expand Down Expand Up @@ -862,6 +875,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
out.writeBoolean(isSystem);
}
timestampMillisRange.writeTo(out);
}

@Override
Expand All @@ -881,6 +895,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds));
builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos));
builder.system(part.isSystem);
builder.timestampMillisRange(timestampMillisRange);
return builder.build();
}
}
Expand Down Expand Up @@ -938,6 +953,7 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
builder.system(in.readBoolean());
}
builder.timestampMillisRange(IndexLongFieldRange.readFrom(in));
return builder.build();
}

Expand Down Expand Up @@ -989,6 +1005,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
out.writeBoolean(isSystem);
}
timestampMillisRange.writeTo(out);
}

public boolean isSystem() {
Expand Down Expand Up @@ -1020,6 +1037,7 @@ public static class Builder {
private final ImmutableOpenMap.Builder<String, RolloverInfo> rolloverInfos;
private Integer routingNumShards;
private boolean isSystem;
private IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS;

public Builder(String index) {
this.index = index;
Expand Down Expand Up @@ -1047,6 +1065,7 @@ public Builder(IndexMetadata indexMetadata) {
this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetadata.inSyncAllocationIds);
this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos);
this.isSystem = indexMetadata.isSystem;
this.timestampMillisRange = indexMetadata.timestampMillisRange;
}

public Builder index(String index) {
Expand Down Expand Up @@ -1255,6 +1274,15 @@ public boolean isSystem() {
return isSystem;
}

public Builder timestampMillisRange(IndexLongFieldRange timestampMillisRange) {
this.timestampMillisRange = timestampMillisRange;
return this;
}

public IndexLongFieldRange getTimestampMillisRange() {
return timestampMillisRange;
}

public IndexMetadata build() {
ImmutableOpenMap.Builder<String, AliasMetadata> tmpAliases = aliases;
Settings tmpSettings = settings;
Expand Down Expand Up @@ -1368,7 +1396,8 @@ public IndexMetadata build() {
routingPartitionSize,
waitForActiveShards,
rolloverInfos.build(),
isSystem);
isSystem,
timestampMillisRange);
}

public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down Expand Up @@ -1469,6 +1498,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
builder.endObject();
builder.field(KEY_SYSTEM, indexMetadata.isSystem);

builder.startObject(KEY_TIMESTAMP_RANGE);
indexMetadata.timestampMillisRange.toXContent(builder, params);
builder.endObject();

builder.endObject();
}

Expand Down Expand Up @@ -1549,6 +1582,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
// simply ignored when upgrading from 2.x
assert Version.CURRENT.major <= 5;
parser.skipChildren();
} else if (KEY_TIMESTAMP_RANGE.equals(currentFieldName)) {
builder.timestampMillisRange(IndexLongFieldRange.fromXContent(parser));
} else {
// assume it's custom index metadata
builder.putCustom(currentFieldName, parser.mapStrings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.ShardLimitValidator;
Expand Down Expand Up @@ -769,6 +770,7 @@ static Tuple<ClusterState, Collection<IndexResult>> closeRoutingTable(final Clus
routingTable.remove(index.getName());
} else {
metadata.put(updatedMetadata
.timestampMillisRange(IndexLongFieldRange.NO_SHARDS)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(Settings.builder()
.put(indexMetadata.getSettings())
Expand Down Expand Up @@ -858,6 +860,7 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
.state(IndexMetadata.State.OPEN)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(updatedSettings)
.timestampMillisRange(IndexLongFieldRange.NO_SHARDS)
.build();

// The index might be closed because we couldn't import it due to old incompatible version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ private IndexMetadata.Builder updateInSyncAllocations(RoutingTable newRoutingTab
final String allocationId;
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
indexMetadataBuilder.timestampMillisRange(indexMetadataBuilder.getTimestampMillisRange()
.removeShard(shardId.id(), oldIndexMetadata.getNumberOfShards()));
} else {
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -1980,4 +1981,12 @@ public interface TranslogRecoveryRunner {
public enum HistorySource {
TRANSLOG, INDEX
}

/**
* @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard if the engine
* guarantees these values never to change, or {@link ShardLongFieldRange#EMPTY} if this field is empty, or
* {@link ShardLongFieldRange#UNKNOWN} if this field's value range may change in future.
*/
public abstract ShardLongFieldRange getRawFieldRange(String field) throws IOException;

}
Loading

0 comments on commit 2c8957c

Please sign in to comment.