diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index c8435296f2e06..461594390cd8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.action.shard; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -36,7 +37,9 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -50,6 +53,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -67,9 +73,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Predicate; @@ -519,16 +527,23 @@ public int hashCode() { public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, + final ShardLongFieldRange timestampMillisRange, final ActionListener listener) { - shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state()); + shardStarted(shardRouting, primaryTerm, message, timestampMillisRange, listener, clusterService.state()); } public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, + final ShardLongFieldRange timestampMillisRange, final ActionListener listener, final ClusterState currentState) { - StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message); + final StartedShardEntry entry = new StartedShardEntry( + shardRouting.shardId(), + shardRouting.allocationId().getId(), + primaryTerm, + message, + timestampMillisRange); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } @@ -578,6 +593,7 @@ public ClusterTasksResult execute(ClusterState currentState, List tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates + final Map updatedTimestampRanges = new HashMap<>(); for (StartedShardEntry task : tasks) { final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { @@ -618,6 +634,22 @@ public ClusterTasksResult execute(ClusterState currentState, tasksToBeApplied.add(task); shardRoutingsToBeApplied.add(matched); seenShardRoutings.add(matched); + + // expand the timestamp range recorded in the index metadata if needed + final Index index = task.shardId.getIndex(); + IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + if (currentTimestampMillisRange == null) { + currentTimestampMillisRange = indexMetadata.getTimestampMillisRange(); + } + final IndexLongFieldRange newTimestampMillisRange; + newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange( + task.shardId.id(), + indexMetadata.getNumberOfShards(), + task.timestampMillisRange); + if (newTimestampMillisRange != currentTimestampMillisRange) { + updatedTimestampRanges.put(index, newTimestampMillisRange); + } } } } @@ -627,6 +659,19 @@ public ClusterTasksResult execute(ClusterState currentState, ClusterState maybeUpdatedState = currentState; try { maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied); + + if (updatedTimestampRanges.isEmpty() == false) { + final Metadata.Builder metadataBuilder = Metadata.builder(maybeUpdatedState.metadata()); + for (Map.Entry updatedTimestampRangeEntry : updatedTimestampRanges.entrySet()) { + metadataBuilder.put(IndexMetadata + .builder(metadataBuilder.getSafe(updatedTimestampRangeEntry.getKey())) + .timestampMillisRange(updatedTimestampRangeEntry.getValue())); + } + maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build(); + } + + assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState); + builder.successes(tasksToBeApplied); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e); @@ -636,6 +681,16 @@ public ClusterTasksResult execute(ClusterState currentState, return builder.build(maybeUpdatedState); } + private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) { + for (ObjectObjectCursor cursor : clusterState.getRoutingTable().getIndicesRouting()) { + assert cursor.value.allPrimaryShardsActive() == false + || clusterState.metadata().index(cursor.key).getTimestampMillisRange().isComplete() + : "index [" + cursor.key + "] should have complete timestamp range, but got " + + clusterState.metadata().index(cursor.key).getTimestampMillisRange() + " for " + cursor.value.prettyPrint(); + } + return true; + } + @Override public void onFailure(String source, Exception e) { if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) { @@ -658,6 +713,7 @@ public static class StartedShardEntry extends TransportRequest { final String allocationId; final long primaryTerm; final String message; + final ShardLongFieldRange timestampMillisRange; StartedShardEntry(StreamInput in) throws IOException { super(in); @@ -676,13 +732,19 @@ public static class StartedShardEntry extends TransportRequest { final Exception ex = in.readException(); assert ex == null : "started shard must not have failure [" + ex + "]"; } + this.timestampMillisRange = ShardLongFieldRange.readFrom(in); } - public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) { + public StartedShardEntry(final ShardId shardId, + final String allocationId, + final long primaryTerm, + final String message, + final ShardLongFieldRange timestampMillisRange) { this.shardId = shardId; this.allocationId = allocationId; this.primaryTerm = primaryTerm; this.message = message; + this.timestampMillisRange = timestampMillisRange; } @Override @@ -699,6 +761,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_6_3_0)) { out.writeException(null); } + timestampMillisRange.writeTo(out); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index b6a18ddfa2b3c..100c3fe26bd3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -55,6 +55,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -340,6 +341,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; + static final String KEY_TIMESTAMP_RANGE = "timestamp_range"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ImmutableOpenMap rolloverInfos; private final boolean isSystem; + private final IndexLongFieldRange timestampMillisRange; + private IndexMetadata( final Index index, final long version, @@ -415,7 +419,8 @@ private IndexMetadata( final int routingPartitionSize, final ActiveShardCount waitForActiveShards, final ImmutableOpenMap rolloverInfos, - final boolean isSystem) { + final boolean isSystem, + final IndexLongFieldRange timestampMillisRange) { this.index = index; this.version = version; @@ -448,6 +453,7 @@ private IndexMetadata( this.waitForActiveShards = waitForActiveShards; this.rolloverInfos = rolloverInfos; this.isSystem = isSystem; + this.timestampMillisRange = timestampMillisRange; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -661,6 +667,10 @@ public DiscoveryNodeFilters excludeFilters() { return excludeFilters; } + public IndexLongFieldRange getTimestampMillisRange() { + return timestampMillisRange; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -770,6 +780,7 @@ private static class IndexMetadataDiff implements Diff { private final Diff>> inSyncAllocationIds; private final Diff> rolloverInfos; private final boolean isSystem; + private final IndexLongFieldRange timestampMillisRange; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -788,6 +799,7 @@ private static class IndexMetadataDiff implements Diff { DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer()); isSystem = after.isSystem; + timestampMillisRange = after.timestampMillisRange; } private static final DiffableUtils.DiffableValueReader ALIAS_METADATA_DIFF_VALUE_READER = @@ -833,6 +845,7 @@ private static class IndexMetadataDiff implements Diff { } else { isSystem = false; } + timestampMillisRange = IndexLongFieldRange.readFrom(in); } @Override @@ -862,6 +875,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { out.writeBoolean(isSystem); } + timestampMillisRange.writeTo(out); } @Override @@ -881,6 +895,7 @@ public IndexMetadata apply(IndexMetadata part) { builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds)); builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos)); builder.system(part.isSystem); + builder.timestampMillisRange(timestampMillisRange); return builder.build(); } } @@ -938,6 +953,7 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { builder.system(in.readBoolean()); } + builder.timestampMillisRange(IndexLongFieldRange.readFrom(in)); return builder.build(); } @@ -989,6 +1005,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { out.writeBoolean(isSystem); } + timestampMillisRange.writeTo(out); } public boolean isSystem() { @@ -1020,6 +1037,7 @@ public static class Builder { private final ImmutableOpenMap.Builder rolloverInfos; private Integer routingNumShards; private boolean isSystem; + private IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS; public Builder(String index) { this.index = index; @@ -1047,6 +1065,7 @@ public Builder(IndexMetadata indexMetadata) { this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetadata.inSyncAllocationIds); this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos); this.isSystem = indexMetadata.isSystem; + this.timestampMillisRange = indexMetadata.timestampMillisRange; } public Builder index(String index) { @@ -1255,6 +1274,15 @@ public boolean isSystem() { return isSystem; } + public Builder timestampMillisRange(IndexLongFieldRange timestampMillisRange) { + this.timestampMillisRange = timestampMillisRange; + return this; + } + + public IndexLongFieldRange getTimestampMillisRange() { + return timestampMillisRange; + } + public IndexMetadata build() { ImmutableOpenMap.Builder tmpAliases = aliases; Settings tmpSettings = settings; @@ -1368,7 +1396,8 @@ public IndexMetadata build() { routingPartitionSize, waitForActiveShards, rolloverInfos.build(), - isSystem); + isSystem, + timestampMillisRange); } public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { @@ -1469,6 +1498,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); builder.field(KEY_SYSTEM, indexMetadata.isSystem); + builder.startObject(KEY_TIMESTAMP_RANGE); + indexMetadata.timestampMillisRange.toXContent(builder, params); + builder.endObject(); + builder.endObject(); } @@ -1549,6 +1582,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti // simply ignored when upgrading from 2.x assert Version.CURRENT.major <= 5; parser.skipChildren(); + } else if (KEY_TIMESTAMP_RANGE.equals(currentFieldName)) { + builder.timestampMillisRange(IndexLongFieldRange.fromXContent(parser)); } else { // assume it's custom index metadata builder.putCustom(currentFieldName, parser.mapStrings()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index 36a996defe996..755a46ce1cb7e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -68,6 +68,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; @@ -769,6 +770,7 @@ static Tuple> closeRoutingTable(final Clus routingTable.remove(index.getName()); } else { metadata.put(updatedMetadata + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS) .settingsVersion(indexMetadata.getSettingsVersion() + 1) .settings(Settings.builder() .put(indexMetadata.getSettings()) @@ -858,6 +860,7 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState) .state(IndexMetadata.State.OPEN) .settingsVersion(indexMetadata.getSettingsVersion() + 1) .settings(updatedSettings) + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS) .build(); // The index might be closed because we couldn't import it due to old incompatible version diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 9b07766ab09c1..d9d2caf738217 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -170,6 +170,8 @@ private IndexMetadata.Builder updateInSyncAllocations(RoutingTable newRoutingTab final String allocationId; if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; + indexMetadataBuilder.timestampMillisRange(indexMetadataBuilder.getTimestampMillisRange() + .removeShard(shardId.id(), oldIndexMetadata.getNumberOfShards())); } else { assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; allocationId = updates.initializedPrimary.allocationId().getId(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e69735ed31682..b767313eb69a3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1980,4 +1981,12 @@ public interface TranslogRecoveryRunner { public enum HistorySource { TRANSLOG, INDEX } + + /** + * @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard if the engine + * guarantees these values never to change, or {@link ShardLongFieldRange#EMPTY} if this field is empty, or + * {@link ShardLongFieldRange#UNKNOWN} if this field's value range may change in future. + */ + public abstract ShardLongFieldRange getRawFieldRange(String field) throws IOException; + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0dfcc1c4a4885..3ede722fcdb7f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -89,6 +89,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -2957,4 +2958,9 @@ SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), Boole refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true); } + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + return ShardLongFieldRange.UNKNOWN; + } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 288a61d76ba5b..48ce733b69049 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.index.engine; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; @@ -35,6 +37,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -530,4 +533,27 @@ protected static DirectoryReader openDirectory(Directory directory, boolean wrap public CompletionStats completionStats(String... fieldNamePatterns) { return completionStatsCache.get(fieldNamePatterns); } + + /** + * @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard, or {@link + * ShardLongFieldRange#EMPTY} if this field is not found or empty. + */ + @Override + public ShardLongFieldRange getRawFieldRange(String field) throws IOException { + try (Searcher searcher = acquireSearcher("field_range")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java index 9acca3102d040..e3193e56befc0 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java @@ -99,6 +99,16 @@ public long parsePointAsMillis(byte[] value) { return LongPoint.decodeDimension(value, 0); } + @Override + public long roundDownToMillis(long value) { + return value; + } + + @Override + public long roundUpToMillis(long value) { + return value; + } + @Override protected Query distanceFeatureQuery(String field, float boost, long origin, TimeValue pivot) { return LongPoint.newDistanceFeatureQuery(field, boost, origin, pivot.getMillis()); @@ -122,7 +132,22 @@ public Instant clampToValidRange(Instant instant) { @Override public long parsePointAsMillis(byte[] value) { - return DateUtils.toMilliSeconds(LongPoint.decodeDimension(value, 0)); + return roundDownToMillis(LongPoint.decodeDimension(value, 0)); + } + + @Override + public long roundDownToMillis(long value) { + return DateUtils.toMilliSeconds(value); + } + + @Override + public long roundUpToMillis(long value) { + if (value <= 0L) { + // if negative then throws an IAE; if zero then return zero + return DateUtils.toMilliSeconds(value); + } else { + return DateUtils.toMilliSeconds(value - 1L) + 1L; + } } @Override @@ -168,6 +193,16 @@ NumericType numericType() { */ public abstract long parsePointAsMillis(byte[] value); + /** + * Round the given raw value down to a number of milliseconds since the epoch. + */ + public abstract long roundDownToMillis(long value); + + /** + * Round the given raw value up to a number of milliseconds since the epoch. + */ + public abstract long roundUpToMillis(long value); + public static Resolution ofOrdinal(int ord) { for (Resolution resolution : values()) { if (ord == resolution.ordinal()) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java new file mode 100644 index 0000000000000..3f6a6b0be058f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java @@ -0,0 +1,370 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.shard.ShardLongFieldRange.LONG_FIELD_RANGE_VERSION_INTRODUCED; + +/** + * Class representing an (inclusive) range of {@code long} values in a field in an index which may comprise multiple shards. This + * information is accumulated shard-by-shard, and we keep track of which shards are represented in this value. Only once all shards are + * represented should this information be considered accurate for the index. + */ +public class IndexLongFieldRange implements Writeable, ToXContentFragment { + + /** + * Sentinel value indicating that no information is currently available, for instance because the index has just been created. + */ + public static final IndexLongFieldRange NO_SHARDS = new IndexLongFieldRange(new int[0], Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating an empty range, for instance because the field is missing or has no values in any shard. + */ + public static final IndexLongFieldRange EMPTY = new IndexLongFieldRange(null, Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating the actual range is unknown, for instance because more docs may be added in future. + */ + public static final IndexLongFieldRange UNKNOWN = new IndexLongFieldRange(null, Long.MIN_VALUE, Long.MAX_VALUE); + + @Nullable // if this range includes all shards + private final int[] shards; + private final long min, max; + + private IndexLongFieldRange(int[] shards, long min, long max) { + assert (min == Long.MAX_VALUE && max == Long.MIN_VALUE) || min <= max : min + " vs " + max; + assert shards == null || shards.length > 0 || (min == Long.MAX_VALUE && max == Long.MIN_VALUE) : Arrays.toString(shards); + assert shards == null || Arrays.equals(shards, Arrays.stream(shards).sorted().distinct().toArray()) : Arrays.toString(shards); + this.shards = shards; + this.min = min; + this.max = max; + } + + /** + * @return whether this range includes information from all shards yet. + */ + public boolean isComplete() { + return shards == null; + } + + // exposed for testing + int[] getShards() { + return shards; + } + + // exposed for testing + long getMinUnsafe() { + return min; + } + + // exposed for testing + long getMaxUnsafe() { + return max; + } + + /** + * @return the (inclusive) minimum of this range. + */ + public long getMin() { + assert shards == null : "min is meaningless if we don't have data from all shards yet"; + assert this != EMPTY : "min is meaningless if range is empty"; + assert this != UNKNOWN : "min is meaningless if range is unknown"; + return min; + } + + /** + * @return the (inclusive) maximum of this range. + */ + public long getMax() { + assert shards == null : "max is meaningless if we don't have data from all shards yet"; + assert this != EMPTY : "max is meaningless if range is empty"; + assert this != UNKNOWN : "max is meaningless if range is unknown"; + return max; + } + + private static final byte WIRE_TYPE_OTHER = (byte)0; + private static final byte WIRE_TYPE_NO_SHARDS = (byte)1; + private static final byte WIRE_TYPE_UNKNOWN = (byte)2; + private static final byte WIRE_TYPE_EMPTY = (byte)3; + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + if (this == NO_SHARDS) { + out.writeByte(WIRE_TYPE_NO_SHARDS); + } else if (this == UNKNOWN) { + out.writeByte(WIRE_TYPE_UNKNOWN); + } else if (this == EMPTY) { + out.writeByte(WIRE_TYPE_EMPTY); + } else { + out.writeByte(WIRE_TYPE_OTHER); + if (shards == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVIntArray(shards); + } + out.writeZLong(min); + out.writeZLong(max); + } + } + } + + public static IndexLongFieldRange readFrom(StreamInput in) throws IOException { + if (in.getVersion().before(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + // conservative treatment for BWC + return UNKNOWN; + } + + final byte type = in.readByte(); + switch (type) { + case WIRE_TYPE_NO_SHARDS: + return NO_SHARDS; + case WIRE_TYPE_UNKNOWN: + return UNKNOWN; + case WIRE_TYPE_EMPTY: + return EMPTY; + case WIRE_TYPE_OTHER: + return new IndexLongFieldRange(in.readBoolean() ? in.readVIntArray() : null, in.readZLong(), in.readZLong()); + default: + throw new IllegalStateException("type [" + type + "] not known"); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (this == UNKNOWN) { + builder.field("unknown", true); + } else if (this == EMPTY) { + builder.field("empty", true); + } else if (this == NO_SHARDS) { + builder.startArray("shards"); + builder.endArray(); + } else { + builder.field("min", min); + builder.field("max", max); + if (shards != null) { + builder.startArray("shards"); + for (int shard : shards) { + builder.value(shard); + } + builder.endArray(); + } + } + return builder; + } + + public static IndexLongFieldRange fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + Boolean isUnknown = null; + Boolean isEmpty = null; + Long min = null; + Long max = null; + List shardsList = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("unknown".equals(currentFieldName)) { + if (Boolean.FALSE.equals(isUnknown)) { + throw new IllegalArgumentException("unexpected field 'unknown'"); + } else { + isUnknown = Boolean.TRUE; + isEmpty = Boolean.FALSE; + } + } else if ("empty".equals(currentFieldName)) { + if (Boolean.FALSE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'empty'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.TRUE; + } + } else if ("min".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'min'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + min = parser.longValue(); + } + } else if ("max".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'max'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + max = parser.longValue(); + } + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("shards".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty) || shardsList != null) { + throw new IllegalArgumentException("unexpected array 'shards'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + shardsList = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token.isValue()) { + shardsList.add(parser.intValue()); + } + } + } + } else { + throw new IllegalArgumentException("Unexpected array: " + currentFieldName); + } + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + + if (Boolean.TRUE.equals(isUnknown)) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && shardsList == null && Boolean.FALSE.equals(isEmpty); + return UNKNOWN; + } else if (Boolean.TRUE.equals(isEmpty)) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && shardsList == null && Boolean.FALSE.equals(isUnknown); + return EMPTY; + } else if (shardsList != null && shardsList.isEmpty()) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && Boolean.FALSE.equals(isEmpty) && Boolean.FALSE.equals(isUnknown); + return NO_SHARDS; + } else if (min != null) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert Boolean.FALSE.equals(isUnknown) && Boolean.FALSE.equals(isEmpty); + if (max == null) { + throw new IllegalArgumentException("field 'max' unexpectedly missing"); + } + final int[] shards; + if (shardsList != null) { + shards = shardsList.stream().mapToInt(i -> i).toArray(); + assert shards.length > 0; + } else { + shards = null; + } + return new IndexLongFieldRange(shards, min, max); + } else { + throw new IllegalArgumentException("field range contents unexpectedly missing"); + } + } + + public IndexLongFieldRange extendWithShardRange(int shardId, int shardCount, ShardLongFieldRange shardFieldRange) { + if (shardFieldRange == ShardLongFieldRange.UNKNOWN) { + assert shards == null + ? this == UNKNOWN + : Arrays.stream(shards).noneMatch(i -> i == shardId); + return UNKNOWN; + } + if (shards == null || Arrays.stream(shards).anyMatch(i -> i == shardId)) { + assert shardFieldRange == ShardLongFieldRange.EMPTY || min <= shardFieldRange.getMin() && shardFieldRange.getMax() <= max; + return this; + } + final int[] newShards; + if (shards.length == shardCount - 1) { + assert Arrays.equals(shards, IntStream.range(0, shardCount).filter(i -> i != shardId).toArray()) + : Arrays.toString(shards) + " + " + shardId; + if (shardFieldRange == ShardLongFieldRange.EMPTY && min == EMPTY.min && max == EMPTY.max) { + return EMPTY; + } + newShards = null; + } else { + newShards = IntStream.concat(Arrays.stream(this.shards), IntStream.of(shardId)).sorted().toArray(); + } + if (shardFieldRange == ShardLongFieldRange.EMPTY) { + return new IndexLongFieldRange(newShards, min, max); + } else { + return new IndexLongFieldRange(newShards, Math.min(shardFieldRange.getMin(), min), Math.max(shardFieldRange.getMax(), max)); + } + } + + @Override + public String toString() { + if (this == NO_SHARDS) { + return "NO_SHARDS"; + } else if (this == UNKNOWN) { + return "UNKNOWN"; + } else if (this == EMPTY) { + return "EMPTY"; + } else if (shards == null) { + return "[" + min + "-" + max + "]"; + } else { + return "[" + min + "-" + max + ", shards=" + Arrays.toString(shards) + "]"; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (this == EMPTY || this == UNKNOWN || this == NO_SHARDS || o == EMPTY || o == UNKNOWN || o == NO_SHARDS) return false; + IndexLongFieldRange that = (IndexLongFieldRange) o; + return min == that.min && + max == that.max && + Arrays.equals(shards, that.shards); + } + + @Override + public int hashCode() { + int result = Objects.hash(min, max); + result = 31 * result + Arrays.hashCode(shards); + return result; + } + + /** + * Remove the given shard from the set of known shards, possibly without adjusting the min and max. Used when allocating a stale primary + * which may have a different range from the original, so we must allow the range to grow. Note that this doesn't usually allow the + * range to shrink, so we may in theory hit this shard more than needed after allocating a stale primary. + */ + public IndexLongFieldRange removeShard(int shardId, int numberOfShards) { + assert 0 <= shardId && shardId < numberOfShards : shardId + " vs " + numberOfShards; + + if (shards != null && Arrays.stream(shards).noneMatch(i -> i == shardId)) { + return this; + } + if (shards == null && numberOfShards == 1) { + return NO_SHARDS; + } + if (this == UNKNOWN) { + return this; + } + if (shards != null && shards.length == 1 && shards[0] == shardId) { + return NO_SHARDS; + } + + final IntStream currentShards = shards == null ? IntStream.range(0, numberOfShards) : Arrays.stream(shards); + return new IndexLongFieldRange(currentShards.filter(i -> i != shardId).toArray(), min, max); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 90acd27b937e5..2d4824bb4387c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -50,6 +50,7 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -107,10 +108,12 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.ShardGetService; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParsedDocument; @@ -1772,6 +1775,41 @@ public RecoveryState recoveryState() { return this.recoveryState; } + @Override + public ShardLongFieldRange getTimestampMillisRange() { + if (mapperService() == null) { + return ShardLongFieldRange.UNKNOWN; // no mapper service, no idea if the field even exists + } + final MappedFieldType mappedFieldType = mapperService().fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false) { + return ShardLongFieldRange.UNKNOWN; // field missing or not a date + } + final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) mappedFieldType; + + final ShardLongFieldRange rawTimestampFieldRange; + try { + rawTimestampFieldRange = getEngine().getRawFieldRange(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + } catch (IOException | AlreadyClosedException e) { + logger.debug("exception obtaining range for timestamp field", e); + return ShardLongFieldRange.UNKNOWN; + } + if (rawTimestampFieldRange == ShardLongFieldRange.UNKNOWN) { + return ShardLongFieldRange.UNKNOWN; + } + if (rawTimestampFieldRange == ShardLongFieldRange.EMPTY) { + return ShardLongFieldRange.EMPTY; + } + + try { + return ShardLongFieldRange.of( + dateFieldType.resolution().roundDownToMillis(rawTimestampFieldRange.getMin()), + dateFieldType.resolution().roundUpToMillis(rawTimestampFieldRange.getMax())); + } catch (IllegalArgumentException e) { + logger.debug(new ParameterizedMessage("could not convert {} to a millisecond time range", rawTimestampFieldRange), e); + return ShardLongFieldRange.UNKNOWN; // any search might match this shard + } + } + /** * perform the last stages of recovery once all translog operations are done. * note that you should still call {@link #postRecovery(String)}. @@ -2719,7 +2757,7 @@ private void executeRecovery(String reason, RecoveryState recoveryState, PeerRec markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { - recoveryListener.onRecoveryDone(recoveryState); + recoveryListener.onRecoveryDone(recoveryState, getTimestampMillisRange()); } }, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java new file mode 100644 index 0000000000000..ff54df299dbfe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * Class representing an (inclusive) range of {@code long} values in a field in a single shard. + */ +public class ShardLongFieldRange implements Writeable { + + public static final Version LONG_FIELD_RANGE_VERSION_INTRODUCED = Version.V_7_11_0; + + /** + * Sentinel value indicating an empty range, for instance because the field is missing or has no values. + */ + public static final ShardLongFieldRange EMPTY = new ShardLongFieldRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating the actual range is unknown, for instance because more docs may be added in future. + */ + public static final ShardLongFieldRange UNKNOWN = new ShardLongFieldRange(Long.MIN_VALUE, Long.MAX_VALUE); + + /** + * Construct a new {@link ShardLongFieldRange} with the given (inclusive) minimum and maximum. + */ + public static ShardLongFieldRange of(long min, long max) { + assert min <= max : min + " vs " + max; + return new ShardLongFieldRange(min, max); + } + + private final long min, max; + + private ShardLongFieldRange(long min, long max) { + this.min = min; + this.max = max; + } + + /** + * @return the (inclusive) minimum of this range. + */ + public long getMin() { + assert this != EMPTY && this != UNKNOWN && min <= max: "must not use actual min of sentinel values"; + return min; + } + + /** + * @return the (inclusive) maximum of this range. + */ + public long getMax() { + assert this != EMPTY && this != UNKNOWN && min <= max : "must not use actual max of sentinel values"; + return max; + } + + @Override + public String toString() { + if (this == UNKNOWN) { + return "UNKNOWN"; + } else if (this == EMPTY) { + return "EMPTY"; + } else { + return "[" + min + "-" + max + "]"; + } + } + + private static final byte WIRE_TYPE_OTHER = (byte)0; + private static final byte WIRE_TYPE_UNKNOWN = (byte)1; + private static final byte WIRE_TYPE_EMPTY = (byte)2; + + public static ShardLongFieldRange readFrom(StreamInput in) throws IOException { + if (in.getVersion().before(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + // conservative treatment for BWC + return UNKNOWN; + } + + final byte type = in.readByte(); + switch (type) { + case WIRE_TYPE_UNKNOWN: + return UNKNOWN; + case WIRE_TYPE_EMPTY: + return EMPTY; + case WIRE_TYPE_OTHER: + return ShardLongFieldRange.of(in.readZLong(), in.readZLong()); + default: + throw new IllegalStateException("type [" + type + "] not known"); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + if (this == UNKNOWN) { + out.writeByte(WIRE_TYPE_UNKNOWN); + } else if (this == EMPTY) { + out.writeByte(WIRE_TYPE_EMPTY); + } else { + out.writeByte(WIRE_TYPE_OTHER); + out.writeZLong(min); + out.writeZLong(max); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (this == EMPTY || this == UNKNOWN || o == EMPTY || o == UNKNOWN) return false; + final ShardLongFieldRange that = (ShardLongFieldRange) o; + return min == that.min && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(min, max); + } +} + diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f918bdf97f132..ff2d71ea3a4fc 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -58,6 +58,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; @@ -635,9 +636,14 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shardRouting.shardId(), state, nodes.getMasterNode()); } if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + - " marked shard as initializing, but shard state is [" + state + "], mark shard as started", - SHARD_STATE_ACTION_LISTENER, clusterState); + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "master " + nodes.getMasterNode() + " marked shard as initializing, but shard state is [" + state + + "], mark shard as started", + shard.getTimestampMillisRange(), + SHARD_STATE_ACTION_LISTENER, + clusterState); } } } @@ -691,8 +697,13 @@ private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm } @Override - public void onRecoveryDone(final RecoveryState state) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + public void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + state.getRecoverySource(), + timestampMillisFieldRange, + SHARD_STATE_ACTION_LISTENER); } @Override @@ -784,6 +795,13 @@ public interface Shard { */ RecoveryState recoveryState(); + /** + * @return the range of the {@code @timestamp} field for this shard, in milliseconds since the epoch, or {@link + * ShardLongFieldRange#EMPTY} if this field is not found, or {@link ShardLongFieldRange#UNKNOWN} if its range is not fixed. + */ + @Nullable + ShardLongFieldRange getTimestampMillisRange(); + /** * Updates the shard state based on an incoming cluster state: * - Updates and persists the new routing value. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 279e9b1d35220..8d064f7661ec0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; @@ -272,7 +273,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov } public interface RecoveryListener { - void onRecoveryDone(RecoveryState state); + void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange); void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 276539d18295b..22efbd9a90249 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -257,7 +257,7 @@ public void markAsDone() { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now decRef(); } - listener.onRecoveryDone(state()); + listener.onRecoveryDone(state(), indexShard.getTimestampMillisRange()); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 3cbe788984fb4..6651139a23d3b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -69,6 +69,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ShardLimitValidator; @@ -348,7 +349,8 @@ public ClusterState execute(ClusterState currentState) { .index(renamedIndexName); indexMdBuilder.settings(Settings.builder() .put(snapshotIndexMetadata.getSettings()) - .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())) + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS); shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState); if (!request.includeAliases() && !snapshotIndexMetadata.getAliases().isEmpty()) { // Remove all aliases - they shouldn't be restored @@ -381,6 +383,7 @@ public ClusterState execute(ClusterState currentState) { 1 + currentIndexMetadata.getSettingsVersion())); indexMdBuilder.aliasesVersion( Math.max(snapshotIndexMetadata.getAliasesVersion(), 1 + currentIndexMetadata.getAliasesVersion())); + indexMdBuilder.timestampMillisRange(IndexLongFieldRange.NO_SHARDS); for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) { indexMdBuilder.primaryTerm(shard, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 7109e2fa282b3..704f1c0416490 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -122,7 +122,10 @@ public void testToXContent() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -220,7 +223,10 @@ public void testToXContent() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 4ee23f298209f..b3913206b5bdd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -245,7 +245,10 @@ public void testToXContent() throws IOException { " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -429,7 +432,10 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -622,7 +628,10 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -749,7 +758,10 @@ public void testToXContentSameTypeName() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index e318a239e3339..d0fef4d02f8d0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -32,7 +32,9 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +52,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -78,7 +81,12 @@ public void testEmptyTaskListProducesSameClusterState() throws Exception { public void testNonExistentIndexMarkedAsSuccessful() throws Exception { final ClusterState clusterState = stateWithNoShard(); - final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test"); + final StartedShardEntry entry = new StartedShardEntry( + new ShardId("test", "_na", 0), + "aId", + randomNonNegativeLong(), + "test", + ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(entry)); assertSame(clusterState, result.resultingState); @@ -95,10 +103,20 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { final List tasks = Stream.concat( // Existent shard id but different allocation id IntStream.range(0, randomIntBetween(1, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetadata.getIndex(), 0), String.valueOf(i), 0L, "allocation id")), + .mapToObj(i -> new StartedShardEntry( + new ShardId(indexMetadata.getIndex(), 0), + String.valueOf(i), + 0L, + "allocation id", + ShardLongFieldRange.UNKNOWN)), // Non existent shard id IntStream.range(1, randomIntBetween(2, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetadata.getIndex(), i), String.valueOf(i), 0L, "shard id")) + .mapToObj(i -> new StartedShardEntry( + new ShardId(indexMetadata.getIndex(), i), + String.valueOf(i), + 0L, + "shard id", + ShardLongFieldRange.UNKNOWN)) ).collect(Collectors.toList()); @@ -127,7 +145,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); } final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); - return new StartedShardEntry(shardId, allocationId, primaryTerm, "test"); + return new StartedShardEntry(shardId, allocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN); }).collect(Collectors.toList()); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); @@ -150,11 +168,11 @@ public void testStartedShards() throws Exception { final String primaryAllocationId = primaryShard.allocationId().getId(); final List tasks = new ArrayList<>(); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test")); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)); if (randomBoolean()) { final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); final String replicaAllocationId = replicaShard.allocationId().getId(); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)); } final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); assertNotSame(clusterState, result.resultingState); @@ -179,7 +197,7 @@ public void testDuplicateStartsAreOkay() throws Exception { final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); final List tasks = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test")) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)) .collect(Collectors.toList()); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); @@ -210,8 +228,12 @@ public void testPrimaryTermsMismatch() throws Exception { final ShardId shardId = new ShardId(clusterState.metadata().index(indexName).getIndex(), shard); final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); { - final StartedShardEntry task = - new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary"); + final StartedShardEntry task = new StartedShardEntry( + shardId, + primaryAllocationId, + primaryTerm - 1, + "primary terms does not match on primary", + ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertSame(clusterState, result.resultingState); @@ -223,8 +245,8 @@ public void testPrimaryTermsMismatch() throws Exception { assertSame(clusterState, result.resultingState); } { - final StartedShardEntry task = - new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary"); + final StartedShardEntry task = new StartedShardEntry( + shardId, primaryAllocationId, primaryTerm, "primary terms match on primary", ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertNotSame(clusterState, result.resultingState); @@ -241,7 +263,8 @@ public void testPrimaryTermsMismatch() throws Exception { final String replicaAllocationId = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next() .allocationId().getId(); - final StartedShardEntry task = new StartedShardEntry(shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica"); + final StartedShardEntry task = new StartedShardEntry( + shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica", ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertNotSame(clusterState, result.resultingState); @@ -254,6 +277,51 @@ public void testPrimaryTermsMismatch() throws Exception { } } + public void testExpandsTimestampRange() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + + final IndexMetadata indexMetadata = clusterState.metadata().index(indexName); + final ShardId shardId = new ShardId(indexMetadata.getIndex(), 0); + final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); + final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String primaryAllocationId = primaryShard.allocationId().getId(); + + assertThat(indexMetadata.getTimestampMillisRange(), sameInstance(IndexLongFieldRange.NO_SHARDS)); + + final ShardLongFieldRange shardTimestampMillisRange = randomBoolean() ? ShardLongFieldRange.UNKNOWN : + randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRange.of(1606407943000L, 1606407944000L); + + final List tasks = new ArrayList<>(); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test", shardTimestampMillisRange)); + if (randomBoolean()) { + final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); + final String replicaAllocationId = replicaShard.allocationId().getId(); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test", shardTimestampMillisRange)); + } + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + + final IndexLongFieldRange timestampMillisRange = result.resultingState.metadata().index(indexName).getTimestampMillisRange(); + if (shardTimestampMillisRange == ShardLongFieldRange.UNKNOWN) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.UNKNOWN)); + } else if (shardTimestampMillisRange == ShardLongFieldRange.EMPTY) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } else { + assertTrue(timestampMillisRange.isComplete()); + assertThat(timestampMillisRange.getMin(), equalTo(shardTimestampMillisRange.getMin())); + assertThat(timestampMillisRange.getMax(), equalTo(shardTimestampMillisRange.getMax())); + } + }); + } + private ClusterStateTaskExecutor.ClusterTasksResult executeTasks(final ClusterState state, final List tasks) throws Exception { final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index d7d547503e2a5..fe1c9aef10395 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -42,7 +42,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRangeWireTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; @@ -76,6 +78,7 @@ import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -424,7 +427,7 @@ public void testShardStarted() throws InterruptedException { final ShardRouting shardRouting = getRandomShardRouting(index); final long primaryTerm = clusterService.state().metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); final TestListener listener = new TestListener(); - shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", listener); + shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", ShardLongFieldRange.UNKNOWN, listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); @@ -433,6 +436,7 @@ public void testShardStarted() throws InterruptedException { assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); assertThat(entry.primaryTerm, equalTo(primaryTerm)); + assertThat(entry.timestampMillisRange, sameInstance(ShardLongFieldRange.UNKNOWN)); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); @@ -485,7 +489,8 @@ public void testShardEntryBWCSerialize() throws Exception { final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, 0L, reason), bwcVersion).streamInput()) { + final StartedShardEntry originalEntry = new StartedShardEntry(shardId, allocationId, 0L, reason, ShardLongFieldRange.UNKNOWN); + try (StreamInput in = serialize(originalEntry, bwcVersion).streamInput()) { in.setVersion(bwcVersion); final FailedShardEntry failedShardEntry = new FailedShardEntry(in); assertThat(failedShardEntry.shardId, equalTo(shardId)); @@ -539,7 +544,14 @@ public void testStartedShardEntrySerialization() throws Exception { final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, primaryTerm, message), version).streamInput()) { + final ShardLongFieldRange timestampMillisRange = ShardLongFieldRangeWireTests.randomRange(); + final StartedShardEntry startedShardEntry = new StartedShardEntry( + shardId, + allocationId, + primaryTerm, + message, + timestampMillisRange); + try (StreamInput in = serialize(startedShardEntry, version).streamInput()) { in.setVersion(version); final StartedShardEntry deserialized = new StartedShardEntry(in); assertThat(deserialized.shardId, equalTo(shardId)); @@ -550,6 +562,8 @@ public void testStartedShardEntrySerialization() throws Exception { assertThat(deserialized.primaryTerm, equalTo(0L)); } assertThat(deserialized.message, equalTo(message)); + assertThat(deserialized.timestampMillisRange, version.onOrAfter(ShardLongFieldRange.LONG_FIELD_RANGE_VERSION_INTRODUCED) ? + equalTo(timestampMillisRange) : sameInstance(ShardLongFieldRange.UNKNOWN)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java index bb4dc637cf832..974d5a8373b29 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java @@ -301,7 +301,10 @@ public void testToXContentAPI_SameTypeName() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -464,7 +467,10 @@ public void testToXContentAPI_FlatSettingTrue_ReduceMappingFalse() throws IOExce " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -570,7 +576,10 @@ public void testToXContentAPI_FlatSettingFalse_ReduceMappingTrue() throws IOExce " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java index 68e5b67ba4b60..29cdccdebfb0c 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.common.collect.List; import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.termvectors.TermVectorsService; import org.elasticsearch.search.DocValueFormat; @@ -34,6 +35,11 @@ import java.time.ZonedDateTime; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class DateFieldMapperTests extends MapperTestCase { @@ -328,4 +334,33 @@ public void testFetchDocValuesNanos() throws IOException { assertEquals(List.of(date), fetchFromDocValues(mapperService, ft, format, date)); assertEquals(List.of("2020-05-15T21:33:02.123Z"), fetchFromDocValues(mapperService, ft, format, 1589578382123L)); } + + public void testResolutionRounding() { + final long millis = randomLong(); + assertThat(DateFieldMapper.Resolution.MILLISECONDS.roundDownToMillis(millis), equalTo(millis)); + assertThat(DateFieldMapper.Resolution.MILLISECONDS.roundUpToMillis(millis), equalTo(millis)); + + final long nanos = randomNonNegativeLong(); + final long down = DateFieldMapper.Resolution.NANOSECONDS.roundDownToMillis(nanos); + assertThat(DateUtils.toNanoSeconds(down), lessThanOrEqualTo(nanos)); + try { + assertThat(DateUtils.toNanoSeconds(down + 1), greaterThan(nanos)); + } catch (IllegalArgumentException e) { + // ok, down+1 was out of range + } + + final long up = DateFieldMapper.Resolution.NANOSECONDS.roundUpToMillis(nanos); + try { + assertThat(DateUtils.toNanoSeconds(up), greaterThanOrEqualTo(nanos)); + } catch (IllegalArgumentException e) { + // ok, up may be out of range by 1; we check that up-1 is in range below (as long as it's >0) + assertThat(up, greaterThan(0L)); + } + + if (up > 0) { + assertThat(DateUtils.toNanoSeconds(up - 1), lessThan(nanos)); + } else { + assertThat(up, equalTo(0L)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java new file mode 100644 index 0000000000000..14597cfa8625d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.junit.Assert.assertSame; + +public class IndexLongFieldRangeTestUtils { + + static IndexLongFieldRange randomRange() { + switch (ESTestCase.between(1, 3)) { + case 1: + return IndexLongFieldRange.UNKNOWN; + case 2: + return IndexLongFieldRange.EMPTY; + case 3: + return randomSpecificRange(); + default: + throw new AssertionError("impossible"); + } + } + + static IndexLongFieldRange randomSpecificRange() { + return randomSpecificRange(null); + } + + static IndexLongFieldRange randomSpecificRange(Boolean complete) { + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + + final int shardCount = ESTestCase.between(1, 5); + for (int i = 0; i < shardCount; i++) { + if (Boolean.FALSE.equals(complete) && range.getShards().length == shardCount - 1) { + // caller requested an incomplete range so we must skip the last shard + break; + } else if (Boolean.TRUE.equals(complete) || randomBoolean()) { + range = range.extendWithShardRange( + i, + shardCount, + randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRangeWireTests.randomSpecificRange()); + } + } + + assert range != IndexLongFieldRange.UNKNOWN; + assert complete == null || complete.equals(range.isComplete()); + return range; + } + + static boolean checkForSameInstances(IndexLongFieldRange expected, IndexLongFieldRange actual) { + final boolean expectSame = expected == IndexLongFieldRange.UNKNOWN + || expected == IndexLongFieldRange.EMPTY + || expected == IndexLongFieldRange.NO_SHARDS; + if (expectSame) { + assertSame(expected, actual); + } + return expectSame; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java new file mode 100644 index 0000000000000..7a41b192424f6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomSpecificRange; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class IndexLongFieldRangeTests extends ESTestCase { + + public void testUnknownShardImpliesUnknownIndex() { + final IndexLongFieldRange range = randomSpecificRange(false); + assertThat(range.extendWithShardRange( + IntStream.of(range.getShards()).max().orElse(0) + 1, + between(1, 10), + ShardLongFieldRange.UNKNOWN), + sameInstance(IndexLongFieldRange.UNKNOWN)); + } + + public void testExtendWithKnownShardIsNoOp() { + IndexLongFieldRange range = randomSpecificRange(); + if (range == IndexLongFieldRange.NO_SHARDS) { + // need at least one known shard + range = range.extendWithShardRange(between(0, 5), 5, ShardLongFieldRange.EMPTY); + } + + final ShardLongFieldRange shardRange; + if (range.getMinUnsafe() == IndexLongFieldRange.EMPTY.getMinUnsafe() + && range.getMaxUnsafe() == IndexLongFieldRange.EMPTY.getMaxUnsafe()) { + shardRange = ShardLongFieldRange.EMPTY; + } else { + final long min = randomLongBetween(range.getMinUnsafe(), range.getMaxUnsafe()); + final long max = randomLongBetween(min, range.getMaxUnsafe()); + shardRange = randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRange.of(min, max); + } + + assertThat(range.extendWithShardRange( + range.isComplete() ? between(1, 10) : randomFrom(IntStream.of(range.getShards()).boxed().collect(Collectors.toList())), + between(1, 10), + shardRange), + sameInstance(range)); + } + + public void testExtendUnknownRangeIsNoOp() { + assertThat(IndexLongFieldRange.UNKNOWN.extendWithShardRange( + between(0, 10), + between(0, 10), + ShardLongFieldRangeWireTests.randomRange()), + sameInstance(IndexLongFieldRange.UNKNOWN)); + } + + public void testCompleteEmptyRangeIsEmptyInstance() { + final int shardCount = between(1, 5); + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + for (int i = 0; i < shardCount; i++) { + assertFalse(range.isComplete()); + range = range.extendWithShardRange(i, shardCount, ShardLongFieldRange.EMPTY); + } + assertThat(range, sameInstance(IndexLongFieldRange.EMPTY)); + assertTrue(range.isComplete()); + } + + public void testIsCompleteWhenAllShardRangesIncluded() { + final int shardCount = between(1, 5); + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (int i = 0; i < shardCount; i++) { + assertFalse(range.isComplete()); + final ShardLongFieldRange shardFieldRange; + if (randomBoolean()) { + shardFieldRange = ShardLongFieldRange.EMPTY; + } else { + shardFieldRange = ShardLongFieldRangeWireTests.randomSpecificRange(); + min = Math.min(min, shardFieldRange.getMin()); + max = Math.max(max, shardFieldRange.getMax()); + } + range = range.extendWithShardRange( + i, + shardCount, + shardFieldRange); + } + assertTrue(range.isComplete()); + if (range != IndexLongFieldRange.EMPTY) { + assertThat(range.getMin(), equalTo(min)); + assertThat(range.getMax(), equalTo(max)); + } else { + assertThat(min, equalTo(Long.MAX_VALUE)); + assertThat(max, equalTo(Long.MIN_VALUE)); + } + } + + public void testCanRemoveShardRange() { + assertThat(IndexLongFieldRange.UNKNOWN.removeShard(between(0, 4), 5), sameInstance(IndexLongFieldRange.UNKNOWN)); + assertThat(IndexLongFieldRange.UNKNOWN.removeShard(0, 1), sameInstance(IndexLongFieldRange.NO_SHARDS)); + + final IndexLongFieldRange initialRange = randomSpecificRange(); + final int shardCount = initialRange.isComplete() + ? between(1, 5) : Arrays.stream(initialRange.getShards()).max().orElse(0) + between(1, 3); + + final int shard = between(0, shardCount - 1); + final IndexLongFieldRange rangeWithoutShard = initialRange.removeShard(shard, shardCount); + assertFalse(rangeWithoutShard.isComplete()); + assertTrue(Arrays.stream(rangeWithoutShard.getShards()).noneMatch(i -> i == shard)); + if (rangeWithoutShard != IndexLongFieldRange.NO_SHARDS) { + assertThat(rangeWithoutShard.getMinUnsafe(), equalTo(initialRange.getMinUnsafe())); + assertThat(rangeWithoutShard.getMaxUnsafe(), equalTo(initialRange.getMaxUnsafe())); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java new file mode 100644 index 0000000000000..81f5940fcafd2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.checkForSameInstances; +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomRange; + +public class IndexLongFieldRangeWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return IndexLongFieldRange::readFrom; + } + + @Override + protected IndexLongFieldRange createTestInstance() { + return randomRange(); + } + + @Override + protected IndexLongFieldRange mutateInstance(IndexLongFieldRange instance) throws IOException { + if (instance == IndexLongFieldRange.UNKNOWN) { + return IndexLongFieldRangeTestUtils.randomSpecificRange(); + } + + if (randomBoolean()) { + return IndexLongFieldRange.UNKNOWN; + } + + while (true) { + final IndexLongFieldRange newInstance = IndexLongFieldRangeTestUtils.randomSpecificRange(); + if (newInstance.getMinUnsafe() != instance.getMinUnsafe() + || newInstance.getMaxUnsafe() != instance.getMaxUnsafe() + || Arrays.equals(newInstance.getShards(), instance.getShards()) == false) { + return newInstance; + } + } + } + + + @Override + protected void assertEqualInstances(IndexLongFieldRange expectedInstance, IndexLongFieldRange newInstance) { + if (checkForSameInstances(expectedInstance, newInstance) == false) { + super.assertEqualInstances(expectedInstance, newInstance); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java new file mode 100644 index 0000000000000..f48346950e6bc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.checkForSameInstances; +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomRange; +import static org.hamcrest.Matchers.sameInstance; + +public class IndexLongFieldRangeXContentTests extends AbstractXContentTestCase { + @Override + protected IndexLongFieldRange createTestInstance() { + return randomRange(); + } + + @Override + protected IndexLongFieldRange doParseInstance(XContentParser parser) throws IOException { + assertThat(parser.nextToken(), sameInstance(XContentParser.Token.START_OBJECT)); + return IndexLongFieldRange.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected void assertEqualInstances(IndexLongFieldRange expectedInstance, IndexLongFieldRange newInstance) { + if (checkForSameInstances(expectedInstance, newInstance) == false) { + super.assertEqualInstances(expectedInstance, newInstance); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java new file mode 100644 index 0000000000000..ffd523c4c2377 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class ShardLongFieldRangeWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return ShardLongFieldRange::readFrom; + } + + @Override + protected ShardLongFieldRange createTestInstance() { + return randomRange(); + } + + public static ShardLongFieldRange randomRange() { + switch (between(1, 3)) { + case 1: + return ShardLongFieldRange.UNKNOWN; + case 2: + return ShardLongFieldRange.EMPTY; + case 3: + return randomSpecificRange(); + default: + throw new AssertionError("impossible"); + } + } + + static ShardLongFieldRange randomSpecificRange() { + final long min = randomLong(); + return ShardLongFieldRange.of(min, randomLongBetween(min, Long.MAX_VALUE)); + } + + @Override + protected ShardLongFieldRange mutateInstance(ShardLongFieldRange instance) throws IOException { + if (instance == ShardLongFieldRange.UNKNOWN) { + return randomBoolean() ? ShardLongFieldRange.EMPTY : randomSpecificRange(); + } + if (instance == ShardLongFieldRange.EMPTY) { + return randomBoolean() ? ShardLongFieldRange.UNKNOWN : randomSpecificRange(); + } + + switch (between(1, 4)) { + case 1: + return ShardLongFieldRange.UNKNOWN; + case 2: + return ShardLongFieldRange.EMPTY; + case 3: + return instance.getMin() == Long.MAX_VALUE + ? randomSpecificRange() + : ShardLongFieldRange.of(randomValueOtherThan(instance.getMin(), + () -> randomLongBetween(Long.MIN_VALUE, instance.getMax())), instance.getMax()); + case 4: + return instance.getMax() == Long.MIN_VALUE + ? randomSpecificRange() + : ShardLongFieldRange.of(instance.getMin(), randomValueOtherThan(instance.getMax(), + () -> randomLongBetween(instance.getMin(), Long.MAX_VALUE))); + default: + throw new AssertionError("impossible"); + } + } + + @Override + protected void assertEqualInstances(ShardLongFieldRange expectedInstance, ShardLongFieldRange newInstance) { + if (expectedInstance == ShardLongFieldRange.UNKNOWN || expectedInstance == ShardLongFieldRange.EMPTY) { + assertSame(expectedInstance, newInstance); + } else { + super.assertEqualInstances(expectedInstance, newInstance); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 5e70f3fd735ae..70b1bae30dcbc 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -407,5 +408,11 @@ public void updateTerm(long newTerm) { } this.term = newTerm; } + + @Override + public ShardLongFieldRange getTimestampMillisRange() { + return ShardLongFieldRange.EMPTY; + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index b1fa969dde889..8e867859a1ce8 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -89,6 +89,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.indices.SystemIndices; @@ -311,7 +312,12 @@ public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { return runTasks(shardStartedClusterStateTaskExecutor, clusterState, startedShards.entrySet().stream() - .map(e -> new StartedShardEntry(e.getKey().shardId(), e.getKey().allocationId().getId(), e.getValue(), "shard started")) + .map(e -> new StartedShardEntry( + e.getKey().shardId(), + e.getKey().allocationId().getId(), + e.getValue(), + "shard started", + ShardLongFieldRange.UNKNOWN)) .collect(Collectors.toList())); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index ba8a8f2b1c6e5..4e0b87a389769 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; @@ -451,7 +452,7 @@ public long addDocument(Iterable doc) throws IOExcepti expectThrows(Exception.class, () -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { throw new AssertionError("recovery must fail"); } diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index a3b908fae5c9d..b58bd3fbf6396 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveriesCollection; @@ -41,7 +42,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { static final PeerRecoveryTargetService.RecoveryListener listener = new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { } @@ -76,7 +77,7 @@ public void testRecoveryTimeout() throws Exception { final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { latch.countDown(); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index bdc101977afc0..a0faed79d6dbc 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm) + .timestampMillisRange(primaryState == ShardRoutingState.STARTED || primaryState == ShardRoutingState.RELOCATING + ? IndexLongFieldRange.UNKNOWN : IndexLongFieldRange.NO_SHARDS) .build(); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); @@ -300,6 +303,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice IndexMetadata indexMetadata = IndexMetadata.builder(index) .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT).put(SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).put(SETTING_CREATION_DATE, System.currentTimeMillis())) + .timestampMillisRange(IndexLongFieldRange.UNKNOWN) .build(); metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 54f46f2e7ec03..979020ddd2640 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -123,7 +123,7 @@ public abstract class IndexShardTestCase extends ESTestCase { protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { } diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java new file mode 100644 index 0000000000000..4143fe08f64aa --- /dev/null +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.frozen.FrozenIndices; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0, transportClientRatio = 0) +public class FrozenIndexIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return org.elasticsearch.common.collect.List.of(FrozenIndices.class, LocalStateCompositeXPackPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOException { + final List nodeNames = internalCluster().startNodes(2); + + createIndex("index", Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + final IndexResponse indexResponse = client().prepareIndex("index", "_doc") + .setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, "2010-01-06T02:03:04.567Z").get(); + + ensureGreen("index"); + + assertThat(client().admin().indices().prepareFlush("index").get().getSuccessfulShards(), equalTo(2)); + assertThat(client().admin().indices().prepareRefresh("index").get().getSuccessfulShards(), equalTo(2)); + + final String excludeSetting = INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(); + assertAcked(client().admin().indices().prepareUpdateSettings("index").setSettings( + Settings.builder().put(excludeSetting, nodeNames.get(0)))); + assertAcked(client().admin().cluster().prepareReroute().add(new CancelAllocationCommand("index", 0, nodeNames.get(0), true))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(1)); + + assertThat(client().prepareDelete("index", "_doc", indexResponse.getId()).get().status(), equalTo(RestStatus.OK)); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, + new FreezeRequest("index").waitForActiveShards(ActiveShardCount.ONE)).actionGet()); + + assertThat(client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.EMPTY)); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(1))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(2)); + assertAcked(client().admin().indices().prepareUpdateSettings("index") + .setSettings(Settings.builder().putNull(excludeSetting))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(2)); + + assertAcked(client().admin().cluster().prepareReroute().add( + new AllocateStalePrimaryAllocationCommand("index", 0, nodeNames.get(0), true))); + + ensureYellowAndNoInitializingShards("index"); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + } + +} diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index f54acf87321e9..5a4e8dcf63a90 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -8,17 +8,15 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.search.builder.PointInTimeBuilder; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.common.Strings; @@ -30,6 +28,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.IndicesService; @@ -38,16 +37,22 @@ import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; import org.elasticsearch.xpack.core.XPackClient; import org.elasticsearch.xpack.frozen.FrozenIndices; import org.hamcrest.Matchers; +import org.joda.time.Instant; import java.io.IOException; import java.util.Arrays; @@ -62,13 +67,15 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class FrozenIndexTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(FrozenIndices.class, XPackPlugin.class); + return pluginList(FrozenIndices.class, LocalStateCompositeXPackPlugin.class); } String openReaders(TimeValue keepAlive, String... indices) { @@ -190,10 +197,13 @@ public void testSearchAndGetAPIsAreThrottled() throws Exception { } public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedException { - createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); - client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + final IndexService originalIndexService = createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); + assertThat(originalIndexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); + + client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + if (randomBoolean()) { // sometimes close it assertAcked(client().admin().indices().prepareClose("index").get()); @@ -207,6 +217,7 @@ public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedExcept assertTrue(indexService.getIndexSettings().isSearchThrottled()); IndexShard shard = indexService.getShard(0); assertEquals(0, shard.refreshStats().getTotal()); + assertThat(indexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); } assertAcked(xPackClient.freeze(new FreezeRequest("index").setFreeze(false))); { @@ -217,6 +228,7 @@ public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedExcept IndexShard shard = indexService.getShard(0); Engine engine = IndexShardTestCase.getEngine(shard); assertThat(engine, Matchers.instanceOf(InternalEngine.class)); + assertThat(indexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); } client().prepareIndex("index", "_doc", "4").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); } @@ -333,7 +345,7 @@ public void testCanMatch() throws IOException, ExecutionException, InterruptedEx new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); - assertEquals(0, response.getTotal().refresh.getTotal()); // never opened a reader + assertEquals(0, response.getTotal().refresh.getTotal()); } } @@ -484,4 +496,61 @@ public void testTranslogStats() throws Exception { equalTo(indexService.getIndexSettings().isSoftDeleteEnabled() ? 0 : nbDocs)); assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0)); } + + public void testComputesTimestampRangeFromMilliseconds() { + final int shardCount = between(1, 3); + createIndex("index", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount).build()); + final String timestampField = DataStream.TimestampField.FIXED_TIMESTAMP_FIELD; + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-05T01:02:03.456Z").get(); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-06T02:03:04.567Z").get(); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + + for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { + assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", + shardStats.getStats().refresh.getTotal(), greaterThanOrEqualTo(1L)); + } + } + + public void testComputesTimestampRangeFromNanoseconds() throws IOException { + + final String timestampField = DataStream.TimestampField.FIXED_TIMESTAMP_FIELD; + final XContentBuilder mapping = XContentFactory.jsonBuilder().startObject() + .startObject("properties") + .startObject(timestampField) + .field("type", "date_nanos") + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject(); + + final int shardCount = between(1, 3); + createIndex("index", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount).build(), "_doc", mapping); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-05T01:02:03.456789012Z").get(); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-06T02:03:04.567890123Z").get(); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.568Z").getMillis())); + + for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { + assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", + shardStats.getStats().refresh.getTotal(), greaterThanOrEqualTo(1L)); + } + } + } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 001993c283001..1209842a697ac 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -28,11 +29,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; @@ -49,6 +52,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.hamcrest.Matchers; +import org.joda.time.Instant; import java.io.IOException; import java.nio.file.Files; @@ -83,6 +87,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -132,6 +138,21 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { assertShardFolders(indexName, false); + assertThat( + client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName) + .getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.UNKNOWN) + ); + final boolean deletedBeforeMount = randomBoolean(); if (deletedBeforeMount) { assertAcked(client().admin().indices().prepareDelete(indexName)); @@ -214,6 +235,21 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { ensureGreen(restoredIndexName); assertShardFolders(restoredIndexName, true); + assertThat( + client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(restoredIndexName) + .get() + .getState() + .metadata() + .index(restoredIndexName) + .getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.UNKNOWN) + ); + if (deletedBeforeMount) { assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0)); assertAcked(client().admin().indices().prepareAliases().addAlias(restoredIndexName, aliasName)); @@ -668,6 +704,89 @@ public void testSnapshotMountedIndexLeavesBlobsUntouched() throws Exception { assertThat(snapshotTwoStatus.getStats().getProcessedFileCount(), equalTo(numShards)); // one segment_N per shard } + public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMetadata() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int numShards = between(1, 3); + + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .addMapping( + "_doc", + XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .field("type", "date_nanos") + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject() + ) + .setSettings(indexSettingsNoReplicas(numShards).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)) + ); + ensureGreen(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + final int docCount = between(0, 1000); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add( + client().prepareIndex(indexName, "_doc") + .setSource( + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, + String.format( + Locale.ROOT, + "2020-11-26T%02d:%02d:%02d.%09dZ", + between(0, 23), + between(0, 59), + between(0, 59), + randomLongBetween(0, 999999999L) + ) + ) + ); + } + indexRandom(true, false, indexRequestBuilders); + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "fs"); + + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", org.elasticsearch.common.collect.List.of(indexName)) + .snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, Settings.EMPTY); + ensureGreen(indexName); + + final IndexLongFieldRange timestampMillisRange = client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName) + .getTimestampMillisRange(); + + assertTrue(timestampMillisRange.isComplete()); + assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + if (docCount == 0) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } else { + assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(Instant.parse("2020-11-26T00:00:00Z").getMillis())); + assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(Instant.parse("2020-11-27T00:00:00Z").getMillis())); + } + } + private void assertTotalHits(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception { final Thread[] threads = new Thread[between(1, 5)]; final AtomicArray allHits = new AtomicArray<>(threads.length);