From 2c8957c9f9340f58a191f0111201dee135a7cd4c Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 1 Dec 2020 15:41:48 +0000 Subject: [PATCH] Record timestamp field range in index metadata Queries including a filter by timestamp range are common in time-series data. Moreover older time-series indices are typically made read-only so that the timestamp range becomes immutable. By recording in the index metadata the range of timestamps covered by each index we can very efficiently skip shards on the coordinating node, even if those shards are not assigned. This commit computes the timestamp range of immutable indices and records it in the index metadata as the shards start for the first time. Note that the only indices it considers immutable today are ones using the `ReadOnlyEngine`, which includes frozen indices and searchable snapshots but not regular indices with a write block. Backport of #65564 and #65720 --- .../action/shard/ShardStateAction.java | 69 +++- .../cluster/metadata/IndexMetadata.java | 39 +- .../metadata/MetadataIndexStateService.java | 3 + .../allocation/IndexMetadataUpdater.java | 2 + .../elasticsearch/index/engine/Engine.java | 9 + .../index/engine/InternalEngine.java | 6 + .../index/engine/ReadOnlyEngine.java | 26 ++ .../index/mapper/DateFieldMapper.java | 37 +- .../index/shard/IndexLongFieldRange.java | 370 ++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 40 +- .../index/shard/ShardLongFieldRange.java | 141 +++++++ .../cluster/IndicesClusterStateService.java | 28 +- .../recovery/PeerRecoveryTargetService.java | 3 +- .../indices/recovery/RecoveryTarget.java | 2 +- .../snapshots/RestoreService.java | 5 +- .../reroute/ClusterRerouteResponseTests.java | 10 +- .../cluster/ClusterStateTests.java | 20 +- ...dStartedClusterStateTaskExecutorTests.java | 92 ++++- .../action/shard/ShardStateActionTests.java | 20 +- .../metadata/ToAndFromJsonMetadataTests.java | 15 +- .../index/mapper/DateFieldMapperTests.java | 35 ++ .../shard/IndexLongFieldRangeTestUtils.java | 77 ++++ .../index/shard/IndexLongFieldRangeTests.java | 133 +++++++ .../shard/IndexLongFieldRangeWireTests.java | 70 ++++ .../IndexLongFieldRangeXContentTests.java | 54 +++ .../shard/ShardLongFieldRangeWireTests.java | 94 +++++ ...actIndicesClusterStateServiceTestCase.java | 7 + .../indices/cluster/ClusterStateChanges.java | 8 +- .../indices/recovery/RecoveryTests.java | 3 +- .../recovery/RecoveriesCollectionTests.java | 5 +- .../ClusterStateCreationUtils.java | 4 + .../index/shard/IndexShardTestCase.java | 2 +- .../index/engine/FrozenIndexIT.java | 100 +++++ .../index/engine/FrozenIndexTests.java | 89 ++++- .../SearchableSnapshotsIntegTests.java | 119 ++++++ 35 files changed, 1683 insertions(+), 54 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java create mode 100644 server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java create mode 100644 x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index c8435296f2e06..461594390cd8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.action.shard; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -36,7 +37,9 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -50,6 +53,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -67,9 +73,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Predicate; @@ -519,16 +527,23 @@ public int hashCode() { public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, + final ShardLongFieldRange timestampMillisRange, final ActionListener listener) { - shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state()); + shardStarted(shardRouting, primaryTerm, message, timestampMillisRange, listener, clusterService.state()); } public void shardStarted(final ShardRouting shardRouting, final long primaryTerm, final String message, + final ShardLongFieldRange timestampMillisRange, final ActionListener listener, final ClusterState currentState) { - StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message); + final StartedShardEntry entry = new StartedShardEntry( + shardRouting.shardId(), + shardRouting.allocationId().getId(), + primaryTerm, + message, + timestampMillisRange); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } @@ -578,6 +593,7 @@ public ClusterTasksResult execute(ClusterState currentState, List tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates + final Map updatedTimestampRanges = new HashMap<>(); for (StartedShardEntry task : tasks) { final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { @@ -618,6 +634,22 @@ public ClusterTasksResult execute(ClusterState currentState, tasksToBeApplied.add(task); shardRoutingsToBeApplied.add(matched); seenShardRoutings.add(matched); + + // expand the timestamp range recorded in the index metadata if needed + final Index index = task.shardId.getIndex(); + IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + if (currentTimestampMillisRange == null) { + currentTimestampMillisRange = indexMetadata.getTimestampMillisRange(); + } + final IndexLongFieldRange newTimestampMillisRange; + newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange( + task.shardId.id(), + indexMetadata.getNumberOfShards(), + task.timestampMillisRange); + if (newTimestampMillisRange != currentTimestampMillisRange) { + updatedTimestampRanges.put(index, newTimestampMillisRange); + } } } } @@ -627,6 +659,19 @@ public ClusterTasksResult execute(ClusterState currentState, ClusterState maybeUpdatedState = currentState; try { maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied); + + if (updatedTimestampRanges.isEmpty() == false) { + final Metadata.Builder metadataBuilder = Metadata.builder(maybeUpdatedState.metadata()); + for (Map.Entry updatedTimestampRangeEntry : updatedTimestampRanges.entrySet()) { + metadataBuilder.put(IndexMetadata + .builder(metadataBuilder.getSafe(updatedTimestampRangeEntry.getKey())) + .timestampMillisRange(updatedTimestampRangeEntry.getValue())); + } + maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build(); + } + + assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState); + builder.successes(tasksToBeApplied); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e); @@ -636,6 +681,16 @@ public ClusterTasksResult execute(ClusterState currentState, return builder.build(maybeUpdatedState); } + private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) { + for (ObjectObjectCursor cursor : clusterState.getRoutingTable().getIndicesRouting()) { + assert cursor.value.allPrimaryShardsActive() == false + || clusterState.metadata().index(cursor.key).getTimestampMillisRange().isComplete() + : "index [" + cursor.key + "] should have complete timestamp range, but got " + + clusterState.metadata().index(cursor.key).getTimestampMillisRange() + " for " + cursor.value.prettyPrint(); + } + return true; + } + @Override public void onFailure(String source, Exception e) { if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) { @@ -658,6 +713,7 @@ public static class StartedShardEntry extends TransportRequest { final String allocationId; final long primaryTerm; final String message; + final ShardLongFieldRange timestampMillisRange; StartedShardEntry(StreamInput in) throws IOException { super(in); @@ -676,13 +732,19 @@ public static class StartedShardEntry extends TransportRequest { final Exception ex = in.readException(); assert ex == null : "started shard must not have failure [" + ex + "]"; } + this.timestampMillisRange = ShardLongFieldRange.readFrom(in); } - public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) { + public StartedShardEntry(final ShardId shardId, + final String allocationId, + final long primaryTerm, + final String message, + final ShardLongFieldRange timestampMillisRange) { this.shardId = shardId; this.allocationId = allocationId; this.primaryTerm = primaryTerm; this.message = message; + this.timestampMillisRange = timestampMillisRange; } @Override @@ -699,6 +761,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_6_3_0)) { out.writeException(null); } + timestampMillisRange.writeTo(out); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index b6a18ddfa2b3c..100c3fe26bd3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -55,6 +55,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -340,6 +341,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ALIASES = "aliases"; static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; + static final String KEY_TIMESTAMP_RANGE = "timestamp_range"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ImmutableOpenMap rolloverInfos; private final boolean isSystem; + private final IndexLongFieldRange timestampMillisRange; + private IndexMetadata( final Index index, final long version, @@ -415,7 +419,8 @@ private IndexMetadata( final int routingPartitionSize, final ActiveShardCount waitForActiveShards, final ImmutableOpenMap rolloverInfos, - final boolean isSystem) { + final boolean isSystem, + final IndexLongFieldRange timestampMillisRange) { this.index = index; this.version = version; @@ -448,6 +453,7 @@ private IndexMetadata( this.waitForActiveShards = waitForActiveShards; this.rolloverInfos = rolloverInfos; this.isSystem = isSystem; + this.timestampMillisRange = timestampMillisRange; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -661,6 +667,10 @@ public DiscoveryNodeFilters excludeFilters() { return excludeFilters; } + public IndexLongFieldRange getTimestampMillisRange() { + return timestampMillisRange; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -770,6 +780,7 @@ private static class IndexMetadataDiff implements Diff { private final Diff>> inSyncAllocationIds; private final Diff> rolloverInfos; private final boolean isSystem; + private final IndexLongFieldRange timestampMillisRange; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -788,6 +799,7 @@ private static class IndexMetadataDiff implements Diff { DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer()); isSystem = after.isSystem; + timestampMillisRange = after.timestampMillisRange; } private static final DiffableUtils.DiffableValueReader ALIAS_METADATA_DIFF_VALUE_READER = @@ -833,6 +845,7 @@ private static class IndexMetadataDiff implements Diff { } else { isSystem = false; } + timestampMillisRange = IndexLongFieldRange.readFrom(in); } @Override @@ -862,6 +875,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { out.writeBoolean(isSystem); } + timestampMillisRange.writeTo(out); } @Override @@ -881,6 +895,7 @@ public IndexMetadata apply(IndexMetadata part) { builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds)); builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos)); builder.system(part.isSystem); + builder.timestampMillisRange(timestampMillisRange); return builder.build(); } } @@ -938,6 +953,7 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { builder.system(in.readBoolean()); } + builder.timestampMillisRange(IndexLongFieldRange.readFrom(in)); return builder.build(); } @@ -989,6 +1005,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) { out.writeBoolean(isSystem); } + timestampMillisRange.writeTo(out); } public boolean isSystem() { @@ -1020,6 +1037,7 @@ public static class Builder { private final ImmutableOpenMap.Builder rolloverInfos; private Integer routingNumShards; private boolean isSystem; + private IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS; public Builder(String index) { this.index = index; @@ -1047,6 +1065,7 @@ public Builder(IndexMetadata indexMetadata) { this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetadata.inSyncAllocationIds); this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos); this.isSystem = indexMetadata.isSystem; + this.timestampMillisRange = indexMetadata.timestampMillisRange; } public Builder index(String index) { @@ -1255,6 +1274,15 @@ public boolean isSystem() { return isSystem; } + public Builder timestampMillisRange(IndexLongFieldRange timestampMillisRange) { + this.timestampMillisRange = timestampMillisRange; + return this; + } + + public IndexLongFieldRange getTimestampMillisRange() { + return timestampMillisRange; + } + public IndexMetadata build() { ImmutableOpenMap.Builder tmpAliases = aliases; Settings tmpSettings = settings; @@ -1368,7 +1396,8 @@ public IndexMetadata build() { routingPartitionSize, waitForActiveShards, rolloverInfos.build(), - isSystem); + isSystem, + timestampMillisRange); } public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { @@ -1469,6 +1498,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); builder.field(KEY_SYSTEM, indexMetadata.isSystem); + builder.startObject(KEY_TIMESTAMP_RANGE); + indexMetadata.timestampMillisRange.toXContent(builder, params); + builder.endObject(); + builder.endObject(); } @@ -1549,6 +1582,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti // simply ignored when upgrading from 2.x assert Version.CURRENT.major <= 5; parser.skipChildren(); + } else if (KEY_TIMESTAMP_RANGE.equals(currentFieldName)) { + builder.timestampMillisRange(IndexLongFieldRange.fromXContent(parser)); } else { // assume it's custom index metadata builder.putCustom(currentFieldName, parser.mapStrings()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index 36a996defe996..755a46ce1cb7e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -68,6 +68,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; @@ -769,6 +770,7 @@ static Tuple> closeRoutingTable(final Clus routingTable.remove(index.getName()); } else { metadata.put(updatedMetadata + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS) .settingsVersion(indexMetadata.getSettingsVersion() + 1) .settings(Settings.builder() .put(indexMetadata.getSettings()) @@ -858,6 +860,7 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState) .state(IndexMetadata.State.OPEN) .settingsVersion(indexMetadata.getSettingsVersion() + 1) .settings(updatedSettings) + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS) .build(); // The index might be closed because we couldn't import it due to old incompatible version diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 9b07766ab09c1..d9d2caf738217 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -170,6 +170,8 @@ private IndexMetadata.Builder updateInSyncAllocations(RoutingTable newRoutingTab final String allocationId; if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; + indexMetadataBuilder.timestampMillisRange(indexMetadataBuilder.getTimestampMillisRange() + .removeShard(shardId.id(), oldIndexMetadata.getNumberOfShards())); } else { assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; allocationId = updates.initializedPrimary.allocationId().getId(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e69735ed31682..b767313eb69a3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -1980,4 +1981,12 @@ public interface TranslogRecoveryRunner { public enum HistorySource { TRANSLOG, INDEX } + + /** + * @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard if the engine + * guarantees these values never to change, or {@link ShardLongFieldRange#EMPTY} if this field is empty, or + * {@link ShardLongFieldRange#UNKNOWN} if this field's value range may change in future. + */ + public abstract ShardLongFieldRange getRawFieldRange(String field) throws IOException; + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0dfcc1c4a4885..3ede722fcdb7f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -89,6 +89,7 @@ import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -2957,4 +2958,9 @@ SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), Boole refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true); } + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + return ShardLongFieldRange.UNKNOWN; + } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 288a61d76ba5b..48ce733b69049 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.index.engine; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; @@ -35,6 +37,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -530,4 +533,27 @@ protected static DirectoryReader openDirectory(Directory directory, boolean wrap public CompletionStats completionStats(String... fieldNamePatterns) { return completionStatsCache.get(fieldNamePatterns); } + + /** + * @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard, or {@link + * ShardLongFieldRange#EMPTY} if this field is not found or empty. + */ + @Override + public ShardLongFieldRange getRawFieldRange(String field) throws IOException { + try (Searcher searcher = acquireSearcher("field_range")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java index 9acca3102d040..e3193e56befc0 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java @@ -99,6 +99,16 @@ public long parsePointAsMillis(byte[] value) { return LongPoint.decodeDimension(value, 0); } + @Override + public long roundDownToMillis(long value) { + return value; + } + + @Override + public long roundUpToMillis(long value) { + return value; + } + @Override protected Query distanceFeatureQuery(String field, float boost, long origin, TimeValue pivot) { return LongPoint.newDistanceFeatureQuery(field, boost, origin, pivot.getMillis()); @@ -122,7 +132,22 @@ public Instant clampToValidRange(Instant instant) { @Override public long parsePointAsMillis(byte[] value) { - return DateUtils.toMilliSeconds(LongPoint.decodeDimension(value, 0)); + return roundDownToMillis(LongPoint.decodeDimension(value, 0)); + } + + @Override + public long roundDownToMillis(long value) { + return DateUtils.toMilliSeconds(value); + } + + @Override + public long roundUpToMillis(long value) { + if (value <= 0L) { + // if negative then throws an IAE; if zero then return zero + return DateUtils.toMilliSeconds(value); + } else { + return DateUtils.toMilliSeconds(value - 1L) + 1L; + } } @Override @@ -168,6 +193,16 @@ NumericType numericType() { */ public abstract long parsePointAsMillis(byte[] value); + /** + * Round the given raw value down to a number of milliseconds since the epoch. + */ + public abstract long roundDownToMillis(long value); + + /** + * Round the given raw value up to a number of milliseconds since the epoch. + */ + public abstract long roundUpToMillis(long value); + public static Resolution ofOrdinal(int ord) { for (Resolution resolution : values()) { if (ord == resolution.ordinal()) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java new file mode 100644 index 0000000000000..3f6a6b0be058f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexLongFieldRange.java @@ -0,0 +1,370 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.shard.ShardLongFieldRange.LONG_FIELD_RANGE_VERSION_INTRODUCED; + +/** + * Class representing an (inclusive) range of {@code long} values in a field in an index which may comprise multiple shards. This + * information is accumulated shard-by-shard, and we keep track of which shards are represented in this value. Only once all shards are + * represented should this information be considered accurate for the index. + */ +public class IndexLongFieldRange implements Writeable, ToXContentFragment { + + /** + * Sentinel value indicating that no information is currently available, for instance because the index has just been created. + */ + public static final IndexLongFieldRange NO_SHARDS = new IndexLongFieldRange(new int[0], Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating an empty range, for instance because the field is missing or has no values in any shard. + */ + public static final IndexLongFieldRange EMPTY = new IndexLongFieldRange(null, Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating the actual range is unknown, for instance because more docs may be added in future. + */ + public static final IndexLongFieldRange UNKNOWN = new IndexLongFieldRange(null, Long.MIN_VALUE, Long.MAX_VALUE); + + @Nullable // if this range includes all shards + private final int[] shards; + private final long min, max; + + private IndexLongFieldRange(int[] shards, long min, long max) { + assert (min == Long.MAX_VALUE && max == Long.MIN_VALUE) || min <= max : min + " vs " + max; + assert shards == null || shards.length > 0 || (min == Long.MAX_VALUE && max == Long.MIN_VALUE) : Arrays.toString(shards); + assert shards == null || Arrays.equals(shards, Arrays.stream(shards).sorted().distinct().toArray()) : Arrays.toString(shards); + this.shards = shards; + this.min = min; + this.max = max; + } + + /** + * @return whether this range includes information from all shards yet. + */ + public boolean isComplete() { + return shards == null; + } + + // exposed for testing + int[] getShards() { + return shards; + } + + // exposed for testing + long getMinUnsafe() { + return min; + } + + // exposed for testing + long getMaxUnsafe() { + return max; + } + + /** + * @return the (inclusive) minimum of this range. + */ + public long getMin() { + assert shards == null : "min is meaningless if we don't have data from all shards yet"; + assert this != EMPTY : "min is meaningless if range is empty"; + assert this != UNKNOWN : "min is meaningless if range is unknown"; + return min; + } + + /** + * @return the (inclusive) maximum of this range. + */ + public long getMax() { + assert shards == null : "max is meaningless if we don't have data from all shards yet"; + assert this != EMPTY : "max is meaningless if range is empty"; + assert this != UNKNOWN : "max is meaningless if range is unknown"; + return max; + } + + private static final byte WIRE_TYPE_OTHER = (byte)0; + private static final byte WIRE_TYPE_NO_SHARDS = (byte)1; + private static final byte WIRE_TYPE_UNKNOWN = (byte)2; + private static final byte WIRE_TYPE_EMPTY = (byte)3; + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + if (this == NO_SHARDS) { + out.writeByte(WIRE_TYPE_NO_SHARDS); + } else if (this == UNKNOWN) { + out.writeByte(WIRE_TYPE_UNKNOWN); + } else if (this == EMPTY) { + out.writeByte(WIRE_TYPE_EMPTY); + } else { + out.writeByte(WIRE_TYPE_OTHER); + if (shards == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeVIntArray(shards); + } + out.writeZLong(min); + out.writeZLong(max); + } + } + } + + public static IndexLongFieldRange readFrom(StreamInput in) throws IOException { + if (in.getVersion().before(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + // conservative treatment for BWC + return UNKNOWN; + } + + final byte type = in.readByte(); + switch (type) { + case WIRE_TYPE_NO_SHARDS: + return NO_SHARDS; + case WIRE_TYPE_UNKNOWN: + return UNKNOWN; + case WIRE_TYPE_EMPTY: + return EMPTY; + case WIRE_TYPE_OTHER: + return new IndexLongFieldRange(in.readBoolean() ? in.readVIntArray() : null, in.readZLong(), in.readZLong()); + default: + throw new IllegalStateException("type [" + type + "] not known"); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (this == UNKNOWN) { + builder.field("unknown", true); + } else if (this == EMPTY) { + builder.field("empty", true); + } else if (this == NO_SHARDS) { + builder.startArray("shards"); + builder.endArray(); + } else { + builder.field("min", min); + builder.field("max", max); + if (shards != null) { + builder.startArray("shards"); + for (int shard : shards) { + builder.value(shard); + } + builder.endArray(); + } + } + return builder; + } + + public static IndexLongFieldRange fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + Boolean isUnknown = null; + Boolean isEmpty = null; + Long min = null; + Long max = null; + List shardsList = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("unknown".equals(currentFieldName)) { + if (Boolean.FALSE.equals(isUnknown)) { + throw new IllegalArgumentException("unexpected field 'unknown'"); + } else { + isUnknown = Boolean.TRUE; + isEmpty = Boolean.FALSE; + } + } else if ("empty".equals(currentFieldName)) { + if (Boolean.FALSE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'empty'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.TRUE; + } + } else if ("min".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'min'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + min = parser.longValue(); + } + } else if ("max".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty)) { + throw new IllegalArgumentException("unexpected field 'max'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + max = parser.longValue(); + } + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("shards".equals(currentFieldName)) { + if (Boolean.TRUE.equals(isUnknown) || Boolean.TRUE.equals(isEmpty) || shardsList != null) { + throw new IllegalArgumentException("unexpected array 'shards'"); + } else { + isUnknown = Boolean.FALSE; + isEmpty = Boolean.FALSE; + shardsList = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token.isValue()) { + shardsList.add(parser.intValue()); + } + } + } + } else { + throw new IllegalArgumentException("Unexpected array: " + currentFieldName); + } + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + + if (Boolean.TRUE.equals(isUnknown)) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && shardsList == null && Boolean.FALSE.equals(isEmpty); + return UNKNOWN; + } else if (Boolean.TRUE.equals(isEmpty)) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && shardsList == null && Boolean.FALSE.equals(isUnknown); + return EMPTY; + } else if (shardsList != null && shardsList.isEmpty()) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert min == null && max == null && Boolean.FALSE.equals(isEmpty) && Boolean.FALSE.equals(isUnknown); + return NO_SHARDS; + } else if (min != null) { + //noinspection ConstantConditions this assertion is always true but left here for the benefit of readers + assert Boolean.FALSE.equals(isUnknown) && Boolean.FALSE.equals(isEmpty); + if (max == null) { + throw new IllegalArgumentException("field 'max' unexpectedly missing"); + } + final int[] shards; + if (shardsList != null) { + shards = shardsList.stream().mapToInt(i -> i).toArray(); + assert shards.length > 0; + } else { + shards = null; + } + return new IndexLongFieldRange(shards, min, max); + } else { + throw new IllegalArgumentException("field range contents unexpectedly missing"); + } + } + + public IndexLongFieldRange extendWithShardRange(int shardId, int shardCount, ShardLongFieldRange shardFieldRange) { + if (shardFieldRange == ShardLongFieldRange.UNKNOWN) { + assert shards == null + ? this == UNKNOWN + : Arrays.stream(shards).noneMatch(i -> i == shardId); + return UNKNOWN; + } + if (shards == null || Arrays.stream(shards).anyMatch(i -> i == shardId)) { + assert shardFieldRange == ShardLongFieldRange.EMPTY || min <= shardFieldRange.getMin() && shardFieldRange.getMax() <= max; + return this; + } + final int[] newShards; + if (shards.length == shardCount - 1) { + assert Arrays.equals(shards, IntStream.range(0, shardCount).filter(i -> i != shardId).toArray()) + : Arrays.toString(shards) + " + " + shardId; + if (shardFieldRange == ShardLongFieldRange.EMPTY && min == EMPTY.min && max == EMPTY.max) { + return EMPTY; + } + newShards = null; + } else { + newShards = IntStream.concat(Arrays.stream(this.shards), IntStream.of(shardId)).sorted().toArray(); + } + if (shardFieldRange == ShardLongFieldRange.EMPTY) { + return new IndexLongFieldRange(newShards, min, max); + } else { + return new IndexLongFieldRange(newShards, Math.min(shardFieldRange.getMin(), min), Math.max(shardFieldRange.getMax(), max)); + } + } + + @Override + public String toString() { + if (this == NO_SHARDS) { + return "NO_SHARDS"; + } else if (this == UNKNOWN) { + return "UNKNOWN"; + } else if (this == EMPTY) { + return "EMPTY"; + } else if (shards == null) { + return "[" + min + "-" + max + "]"; + } else { + return "[" + min + "-" + max + ", shards=" + Arrays.toString(shards) + "]"; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (this == EMPTY || this == UNKNOWN || this == NO_SHARDS || o == EMPTY || o == UNKNOWN || o == NO_SHARDS) return false; + IndexLongFieldRange that = (IndexLongFieldRange) o; + return min == that.min && + max == that.max && + Arrays.equals(shards, that.shards); + } + + @Override + public int hashCode() { + int result = Objects.hash(min, max); + result = 31 * result + Arrays.hashCode(shards); + return result; + } + + /** + * Remove the given shard from the set of known shards, possibly without adjusting the min and max. Used when allocating a stale primary + * which may have a different range from the original, so we must allow the range to grow. Note that this doesn't usually allow the + * range to shrink, so we may in theory hit this shard more than needed after allocating a stale primary. + */ + public IndexLongFieldRange removeShard(int shardId, int numberOfShards) { + assert 0 <= shardId && shardId < numberOfShards : shardId + " vs " + numberOfShards; + + if (shards != null && Arrays.stream(shards).noneMatch(i -> i == shardId)) { + return this; + } + if (shards == null && numberOfShards == 1) { + return NO_SHARDS; + } + if (this == UNKNOWN) { + return this; + } + if (shards != null && shards.length == 1 && shards[0] == shardId) { + return NO_SHARDS; + } + + final IntStream currentShards = shards == null ? IntStream.range(0, numberOfShards) : Arrays.stream(shards); + return new IndexLongFieldRange(currentShards.filter(i -> i != shardId).toArray(), min, max); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 90acd27b937e5..2d4824bb4387c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -50,6 +50,7 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -107,10 +108,12 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.ShardGetService; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParsedDocument; @@ -1772,6 +1775,41 @@ public RecoveryState recoveryState() { return this.recoveryState; } + @Override + public ShardLongFieldRange getTimestampMillisRange() { + if (mapperService() == null) { + return ShardLongFieldRange.UNKNOWN; // no mapper service, no idea if the field even exists + } + final MappedFieldType mappedFieldType = mapperService().fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false) { + return ShardLongFieldRange.UNKNOWN; // field missing or not a date + } + final DateFieldMapper.DateFieldType dateFieldType = (DateFieldMapper.DateFieldType) mappedFieldType; + + final ShardLongFieldRange rawTimestampFieldRange; + try { + rawTimestampFieldRange = getEngine().getRawFieldRange(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD); + } catch (IOException | AlreadyClosedException e) { + logger.debug("exception obtaining range for timestamp field", e); + return ShardLongFieldRange.UNKNOWN; + } + if (rawTimestampFieldRange == ShardLongFieldRange.UNKNOWN) { + return ShardLongFieldRange.UNKNOWN; + } + if (rawTimestampFieldRange == ShardLongFieldRange.EMPTY) { + return ShardLongFieldRange.EMPTY; + } + + try { + return ShardLongFieldRange.of( + dateFieldType.resolution().roundDownToMillis(rawTimestampFieldRange.getMin()), + dateFieldType.resolution().roundUpToMillis(rawTimestampFieldRange.getMax())); + } catch (IllegalArgumentException e) { + logger.debug(new ParameterizedMessage("could not convert {} to a millisecond time range", rawTimestampFieldRange), e); + return ShardLongFieldRange.UNKNOWN; // any search might match this shard + } + } + /** * perform the last stages of recovery once all translog operations are done. * note that you should still call {@link #postRecovery(String)}. @@ -2719,7 +2757,7 @@ private void executeRecovery(String reason, RecoveryState recoveryState, PeerRec markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { - recoveryListener.onRecoveryDone(recoveryState); + recoveryListener.onRecoveryDone(recoveryState, getTimestampMillisRange()); } }, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java b/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java new file mode 100644 index 0000000000000..ff54df299dbfe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardLongFieldRange.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * Class representing an (inclusive) range of {@code long} values in a field in a single shard. + */ +public class ShardLongFieldRange implements Writeable { + + public static final Version LONG_FIELD_RANGE_VERSION_INTRODUCED = Version.V_7_11_0; + + /** + * Sentinel value indicating an empty range, for instance because the field is missing or has no values. + */ + public static final ShardLongFieldRange EMPTY = new ShardLongFieldRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** + * Sentinel value indicating the actual range is unknown, for instance because more docs may be added in future. + */ + public static final ShardLongFieldRange UNKNOWN = new ShardLongFieldRange(Long.MIN_VALUE, Long.MAX_VALUE); + + /** + * Construct a new {@link ShardLongFieldRange} with the given (inclusive) minimum and maximum. + */ + public static ShardLongFieldRange of(long min, long max) { + assert min <= max : min + " vs " + max; + return new ShardLongFieldRange(min, max); + } + + private final long min, max; + + private ShardLongFieldRange(long min, long max) { + this.min = min; + this.max = max; + } + + /** + * @return the (inclusive) minimum of this range. + */ + public long getMin() { + assert this != EMPTY && this != UNKNOWN && min <= max: "must not use actual min of sentinel values"; + return min; + } + + /** + * @return the (inclusive) maximum of this range. + */ + public long getMax() { + assert this != EMPTY && this != UNKNOWN && min <= max : "must not use actual max of sentinel values"; + return max; + } + + @Override + public String toString() { + if (this == UNKNOWN) { + return "UNKNOWN"; + } else if (this == EMPTY) { + return "EMPTY"; + } else { + return "[" + min + "-" + max + "]"; + } + } + + private static final byte WIRE_TYPE_OTHER = (byte)0; + private static final byte WIRE_TYPE_UNKNOWN = (byte)1; + private static final byte WIRE_TYPE_EMPTY = (byte)2; + + public static ShardLongFieldRange readFrom(StreamInput in) throws IOException { + if (in.getVersion().before(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + // conservative treatment for BWC + return UNKNOWN; + } + + final byte type = in.readByte(); + switch (type) { + case WIRE_TYPE_UNKNOWN: + return UNKNOWN; + case WIRE_TYPE_EMPTY: + return EMPTY; + case WIRE_TYPE_OTHER: + return ShardLongFieldRange.of(in.readZLong(), in.readZLong()); + default: + throw new IllegalStateException("type [" + type + "] not known"); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(LONG_FIELD_RANGE_VERSION_INTRODUCED)) { + if (this == UNKNOWN) { + out.writeByte(WIRE_TYPE_UNKNOWN); + } else if (this == EMPTY) { + out.writeByte(WIRE_TYPE_EMPTY); + } else { + out.writeByte(WIRE_TYPE_OTHER); + out.writeZLong(min); + out.writeZLong(max); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (this == EMPTY || this == UNKNOWN || o == EMPTY || o == UNKNOWN) return false; + final ShardLongFieldRange that = (ShardLongFieldRange) o; + return min == that.min && max == that.max; + } + + @Override + public int hashCode() { + return Objects.hash(min, max); + } +} + diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f918bdf97f132..ff2d71ea3a4fc 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -58,6 +58,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; @@ -635,9 +636,14 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shardRouting.shardId(), state, nodes.getMasterNode()); } if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + - " marked shard as initializing, but shard state is [" + state + "], mark shard as started", - SHARD_STATE_ACTION_LISTENER, clusterState); + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "master " + nodes.getMasterNode() + " marked shard as initializing, but shard state is [" + state + + "], mark shard as started", + shard.getTimestampMillisRange(), + SHARD_STATE_ACTION_LISTENER, + clusterState); } } } @@ -691,8 +697,13 @@ private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm } @Override - public void onRecoveryDone(final RecoveryState state) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + public void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { + shardStateAction.shardStarted( + shardRouting, + primaryTerm, + "after " + state.getRecoverySource(), + timestampMillisFieldRange, + SHARD_STATE_ACTION_LISTENER); } @Override @@ -784,6 +795,13 @@ public interface Shard { */ RecoveryState recoveryState(); + /** + * @return the range of the {@code @timestamp} field for this shard, in milliseconds since the epoch, or {@link + * ShardLongFieldRange#EMPTY} if this field is not found, or {@link ShardLongFieldRange#UNKNOWN} if its range is not fixed. + */ + @Nullable + ShardLongFieldRange getTimestampMillisRange(); + /** * Updates the shard state based on an incoming cluster state: * - Updates and persists the new routing value. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 279e9b1d35220..8d064f7661ec0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.store.Store; @@ -272,7 +273,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, Discov } public interface RecoveryListener { - void onRecoveryDone(RecoveryState state); + void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange); void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 276539d18295b..22efbd9a90249 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -257,7 +257,7 @@ public void markAsDone() { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now decRef(); } - listener.onRecoveryDone(state()); + listener.onRecoveryDone(state(), indexShard.getTimestampMillisRange()); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 3cbe788984fb4..6651139a23d3b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -69,6 +69,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ShardLimitValidator; @@ -348,7 +349,8 @@ public ClusterState execute(ClusterState currentState) { .index(renamedIndexName); indexMdBuilder.settings(Settings.builder() .put(snapshotIndexMetadata.getSettings()) - .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())) + .timestampMillisRange(IndexLongFieldRange.NO_SHARDS); shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState); if (!request.includeAliases() && !snapshotIndexMetadata.getAliases().isEmpty()) { // Remove all aliases - they shouldn't be restored @@ -381,6 +383,7 @@ public ClusterState execute(ClusterState currentState) { 1 + currentIndexMetadata.getSettingsVersion())); indexMdBuilder.aliasesVersion( Math.max(snapshotIndexMetadata.getAliasesVersion(), 1 + currentIndexMetadata.getAliasesVersion())); + indexMdBuilder.timestampMillisRange(IndexLongFieldRange.NO_SHARDS); for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) { indexMdBuilder.primaryTerm(shard, diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 7109e2fa282b3..704f1c0416490 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -122,7 +122,10 @@ public void testToXContent() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -220,7 +223,10 @@ public void testToXContent() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 4ee23f298209f..b3913206b5bdd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -245,7 +245,10 @@ public void testToXContent() throws IOException { " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -429,7 +432,10 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -622,7 +628,10 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -749,7 +758,10 @@ public void testToXContentSameTypeName() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index e318a239e3339..d0fef4d02f8d0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -32,7 +32,9 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +52,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -78,7 +81,12 @@ public void testEmptyTaskListProducesSameClusterState() throws Exception { public void testNonExistentIndexMarkedAsSuccessful() throws Exception { final ClusterState clusterState = stateWithNoShard(); - final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test"); + final StartedShardEntry entry = new StartedShardEntry( + new ShardId("test", "_na", 0), + "aId", + randomNonNegativeLong(), + "test", + ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(entry)); assertSame(clusterState, result.resultingState); @@ -95,10 +103,20 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { final List tasks = Stream.concat( // Existent shard id but different allocation id IntStream.range(0, randomIntBetween(1, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetadata.getIndex(), 0), String.valueOf(i), 0L, "allocation id")), + .mapToObj(i -> new StartedShardEntry( + new ShardId(indexMetadata.getIndex(), 0), + String.valueOf(i), + 0L, + "allocation id", + ShardLongFieldRange.UNKNOWN)), // Non existent shard id IntStream.range(1, randomIntBetween(2, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetadata.getIndex(), i), String.valueOf(i), 0L, "shard id")) + .mapToObj(i -> new StartedShardEntry( + new ShardId(indexMetadata.getIndex(), i), + String.valueOf(i), + 0L, + "shard id", + ShardLongFieldRange.UNKNOWN)) ).collect(Collectors.toList()); @@ -127,7 +145,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); } final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); - return new StartedShardEntry(shardId, allocationId, primaryTerm, "test"); + return new StartedShardEntry(shardId, allocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN); }).collect(Collectors.toList()); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); @@ -150,11 +168,11 @@ public void testStartedShards() throws Exception { final String primaryAllocationId = primaryShard.allocationId().getId(); final List tasks = new ArrayList<>(); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test")); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)); if (randomBoolean()) { final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); final String replicaAllocationId = replicaShard.allocationId().getId(); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)); } final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); assertNotSame(clusterState, result.resultingState); @@ -179,7 +197,7 @@ public void testDuplicateStartsAreOkay() throws Exception { final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); final List tasks = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test")) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test", ShardLongFieldRange.UNKNOWN)) .collect(Collectors.toList()); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); @@ -210,8 +228,12 @@ public void testPrimaryTermsMismatch() throws Exception { final ShardId shardId = new ShardId(clusterState.metadata().index(indexName).getIndex(), shard); final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); { - final StartedShardEntry task = - new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary"); + final StartedShardEntry task = new StartedShardEntry( + shardId, + primaryAllocationId, + primaryTerm - 1, + "primary terms does not match on primary", + ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertSame(clusterState, result.resultingState); @@ -223,8 +245,8 @@ public void testPrimaryTermsMismatch() throws Exception { assertSame(clusterState, result.resultingState); } { - final StartedShardEntry task = - new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary"); + final StartedShardEntry task = new StartedShardEntry( + shardId, primaryAllocationId, primaryTerm, "primary terms match on primary", ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertNotSame(clusterState, result.resultingState); @@ -241,7 +263,8 @@ public void testPrimaryTermsMismatch() throws Exception { final String replicaAllocationId = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next() .allocationId().getId(); - final StartedShardEntry task = new StartedShardEntry(shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica"); + final StartedShardEntry task = new StartedShardEntry( + shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica", ShardLongFieldRange.UNKNOWN); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertNotSame(clusterState, result.resultingState); @@ -254,6 +277,51 @@ public void testPrimaryTermsMismatch() throws Exception { } } + public void testExpandsTimestampRange() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + + final IndexMetadata indexMetadata = clusterState.metadata().index(indexName); + final ShardId shardId = new ShardId(indexMetadata.getIndex(), 0); + final long primaryTerm = indexMetadata.primaryTerm(shardId.id()); + final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String primaryAllocationId = primaryShard.allocationId().getId(); + + assertThat(indexMetadata.getTimestampMillisRange(), sameInstance(IndexLongFieldRange.NO_SHARDS)); + + final ShardLongFieldRange shardTimestampMillisRange = randomBoolean() ? ShardLongFieldRange.UNKNOWN : + randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRange.of(1606407943000L, 1606407944000L); + + final List tasks = new ArrayList<>(); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test", shardTimestampMillisRange)); + if (randomBoolean()) { + final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); + final String replicaAllocationId = replicaShard.allocationId().getId(); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test", shardTimestampMillisRange)); + } + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + + final IndexLongFieldRange timestampMillisRange = result.resultingState.metadata().index(indexName).getTimestampMillisRange(); + if (shardTimestampMillisRange == ShardLongFieldRange.UNKNOWN) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.UNKNOWN)); + } else if (shardTimestampMillisRange == ShardLongFieldRange.EMPTY) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } else { + assertTrue(timestampMillisRange.isComplete()); + assertThat(timestampMillisRange.getMin(), equalTo(shardTimestampMillisRange.getMin())); + assertThat(timestampMillisRange.getMax(), equalTo(shardTimestampMillisRange.getMax())); + } + }); + } + private ClusterStateTaskExecutor.ClusterTasksResult executeTasks(final ClusterState state, final List tasks) throws Exception { final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index d7d547503e2a5..fe1c9aef10395 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -42,7 +42,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRangeWireTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; @@ -76,6 +78,7 @@ import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -424,7 +427,7 @@ public void testShardStarted() throws InterruptedException { final ShardRouting shardRouting = getRandomShardRouting(index); final long primaryTerm = clusterService.state().metadata().index(shardRouting.index()).primaryTerm(shardRouting.id()); final TestListener listener = new TestListener(); - shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", listener); + shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", ShardLongFieldRange.UNKNOWN, listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); @@ -433,6 +436,7 @@ public void testShardStarted() throws InterruptedException { assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); assertThat(entry.primaryTerm, equalTo(primaryTerm)); + assertThat(entry.timestampMillisRange, sameInstance(ShardLongFieldRange.UNKNOWN)); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); @@ -485,7 +489,8 @@ public void testShardEntryBWCSerialize() throws Exception { final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, 0L, reason), bwcVersion).streamInput()) { + final StartedShardEntry originalEntry = new StartedShardEntry(shardId, allocationId, 0L, reason, ShardLongFieldRange.UNKNOWN); + try (StreamInput in = serialize(originalEntry, bwcVersion).streamInput()) { in.setVersion(bwcVersion); final FailedShardEntry failedShardEntry = new FailedShardEntry(in); assertThat(failedShardEntry.shardId, equalTo(shardId)); @@ -539,7 +544,14 @@ public void testStartedShardEntrySerialization() throws Exception { final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, primaryTerm, message), version).streamInput()) { + final ShardLongFieldRange timestampMillisRange = ShardLongFieldRangeWireTests.randomRange(); + final StartedShardEntry startedShardEntry = new StartedShardEntry( + shardId, + allocationId, + primaryTerm, + message, + timestampMillisRange); + try (StreamInput in = serialize(startedShardEntry, version).streamInput()) { in.setVersion(version); final StartedShardEntry deserialized = new StartedShardEntry(in); assertThat(deserialized.shardId, equalTo(shardId)); @@ -550,6 +562,8 @@ public void testStartedShardEntrySerialization() throws Exception { assertThat(deserialized.primaryTerm, equalTo(0L)); } assertThat(deserialized.message, equalTo(message)); + assertThat(deserialized.timestampMillisRange, version.onOrAfter(ShardLongFieldRange.LONG_FIELD_RANGE_VERSION_INTRODUCED) ? + equalTo(timestampMillisRange) : sameInstance(ShardLongFieldRange.UNKNOWN)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java index bb4dc637cf832..974d5a8373b29 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java @@ -301,7 +301,10 @@ public void testToXContentAPI_SameTypeName() throws IOException { " \"0\" : [ ]\n" + " },\n" + " \"rollover_info\" : { },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -464,7 +467,10 @@ public void testToXContentAPI_FlatSettingTrue_ReduceMappingFalse() throws IOExce " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + @@ -570,7 +576,10 @@ public void testToXContentAPI_FlatSettingFalse_ReduceMappingTrue() throws IOExce " \"time\" : 1\n" + " }\n" + " },\n" + - " \"system\" : false\n" + + " \"system\" : false,\n" + + " \"timestamp_range\" : {\n" + + " \"shards\" : [ ]\n" + + " }\n" + " }\n" + " },\n" + " \"index-graveyard\" : {\n" + diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java index 68e5b67ba4b60..29cdccdebfb0c 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.common.collect.List; import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.termvectors.TermVectorsService; import org.elasticsearch.search.DocValueFormat; @@ -34,6 +35,11 @@ import java.time.ZonedDateTime; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public class DateFieldMapperTests extends MapperTestCase { @@ -328,4 +334,33 @@ public void testFetchDocValuesNanos() throws IOException { assertEquals(List.of(date), fetchFromDocValues(mapperService, ft, format, date)); assertEquals(List.of("2020-05-15T21:33:02.123Z"), fetchFromDocValues(mapperService, ft, format, 1589578382123L)); } + + public void testResolutionRounding() { + final long millis = randomLong(); + assertThat(DateFieldMapper.Resolution.MILLISECONDS.roundDownToMillis(millis), equalTo(millis)); + assertThat(DateFieldMapper.Resolution.MILLISECONDS.roundUpToMillis(millis), equalTo(millis)); + + final long nanos = randomNonNegativeLong(); + final long down = DateFieldMapper.Resolution.NANOSECONDS.roundDownToMillis(nanos); + assertThat(DateUtils.toNanoSeconds(down), lessThanOrEqualTo(nanos)); + try { + assertThat(DateUtils.toNanoSeconds(down + 1), greaterThan(nanos)); + } catch (IllegalArgumentException e) { + // ok, down+1 was out of range + } + + final long up = DateFieldMapper.Resolution.NANOSECONDS.roundUpToMillis(nanos); + try { + assertThat(DateUtils.toNanoSeconds(up), greaterThanOrEqualTo(nanos)); + } catch (IllegalArgumentException e) { + // ok, up may be out of range by 1; we check that up-1 is in range below (as long as it's >0) + assertThat(up, greaterThan(0L)); + } + + if (up > 0) { + assertThat(DateUtils.toNanoSeconds(up - 1), lessThan(nanos)); + } else { + assertThat(up, equalTo(0L)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java new file mode 100644 index 0000000000000..14597cfa8625d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTestUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.junit.Assert.assertSame; + +public class IndexLongFieldRangeTestUtils { + + static IndexLongFieldRange randomRange() { + switch (ESTestCase.between(1, 3)) { + case 1: + return IndexLongFieldRange.UNKNOWN; + case 2: + return IndexLongFieldRange.EMPTY; + case 3: + return randomSpecificRange(); + default: + throw new AssertionError("impossible"); + } + } + + static IndexLongFieldRange randomSpecificRange() { + return randomSpecificRange(null); + } + + static IndexLongFieldRange randomSpecificRange(Boolean complete) { + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + + final int shardCount = ESTestCase.between(1, 5); + for (int i = 0; i < shardCount; i++) { + if (Boolean.FALSE.equals(complete) && range.getShards().length == shardCount - 1) { + // caller requested an incomplete range so we must skip the last shard + break; + } else if (Boolean.TRUE.equals(complete) || randomBoolean()) { + range = range.extendWithShardRange( + i, + shardCount, + randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRangeWireTests.randomSpecificRange()); + } + } + + assert range != IndexLongFieldRange.UNKNOWN; + assert complete == null || complete.equals(range.isComplete()); + return range; + } + + static boolean checkForSameInstances(IndexLongFieldRange expected, IndexLongFieldRange actual) { + final boolean expectSame = expected == IndexLongFieldRange.UNKNOWN + || expected == IndexLongFieldRange.EMPTY + || expected == IndexLongFieldRange.NO_SHARDS; + if (expectSame) { + assertSame(expected, actual); + } + return expectSame; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java new file mode 100644 index 0000000000000..7a41b192424f6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeTests.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomSpecificRange; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class IndexLongFieldRangeTests extends ESTestCase { + + public void testUnknownShardImpliesUnknownIndex() { + final IndexLongFieldRange range = randomSpecificRange(false); + assertThat(range.extendWithShardRange( + IntStream.of(range.getShards()).max().orElse(0) + 1, + between(1, 10), + ShardLongFieldRange.UNKNOWN), + sameInstance(IndexLongFieldRange.UNKNOWN)); + } + + public void testExtendWithKnownShardIsNoOp() { + IndexLongFieldRange range = randomSpecificRange(); + if (range == IndexLongFieldRange.NO_SHARDS) { + // need at least one known shard + range = range.extendWithShardRange(between(0, 5), 5, ShardLongFieldRange.EMPTY); + } + + final ShardLongFieldRange shardRange; + if (range.getMinUnsafe() == IndexLongFieldRange.EMPTY.getMinUnsafe() + && range.getMaxUnsafe() == IndexLongFieldRange.EMPTY.getMaxUnsafe()) { + shardRange = ShardLongFieldRange.EMPTY; + } else { + final long min = randomLongBetween(range.getMinUnsafe(), range.getMaxUnsafe()); + final long max = randomLongBetween(min, range.getMaxUnsafe()); + shardRange = randomBoolean() ? ShardLongFieldRange.EMPTY : ShardLongFieldRange.of(min, max); + } + + assertThat(range.extendWithShardRange( + range.isComplete() ? between(1, 10) : randomFrom(IntStream.of(range.getShards()).boxed().collect(Collectors.toList())), + between(1, 10), + shardRange), + sameInstance(range)); + } + + public void testExtendUnknownRangeIsNoOp() { + assertThat(IndexLongFieldRange.UNKNOWN.extendWithShardRange( + between(0, 10), + between(0, 10), + ShardLongFieldRangeWireTests.randomRange()), + sameInstance(IndexLongFieldRange.UNKNOWN)); + } + + public void testCompleteEmptyRangeIsEmptyInstance() { + final int shardCount = between(1, 5); + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + for (int i = 0; i < shardCount; i++) { + assertFalse(range.isComplete()); + range = range.extendWithShardRange(i, shardCount, ShardLongFieldRange.EMPTY); + } + assertThat(range, sameInstance(IndexLongFieldRange.EMPTY)); + assertTrue(range.isComplete()); + } + + public void testIsCompleteWhenAllShardRangesIncluded() { + final int shardCount = between(1, 5); + IndexLongFieldRange range = IndexLongFieldRange.NO_SHARDS; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (int i = 0; i < shardCount; i++) { + assertFalse(range.isComplete()); + final ShardLongFieldRange shardFieldRange; + if (randomBoolean()) { + shardFieldRange = ShardLongFieldRange.EMPTY; + } else { + shardFieldRange = ShardLongFieldRangeWireTests.randomSpecificRange(); + min = Math.min(min, shardFieldRange.getMin()); + max = Math.max(max, shardFieldRange.getMax()); + } + range = range.extendWithShardRange( + i, + shardCount, + shardFieldRange); + } + assertTrue(range.isComplete()); + if (range != IndexLongFieldRange.EMPTY) { + assertThat(range.getMin(), equalTo(min)); + assertThat(range.getMax(), equalTo(max)); + } else { + assertThat(min, equalTo(Long.MAX_VALUE)); + assertThat(max, equalTo(Long.MIN_VALUE)); + } + } + + public void testCanRemoveShardRange() { + assertThat(IndexLongFieldRange.UNKNOWN.removeShard(between(0, 4), 5), sameInstance(IndexLongFieldRange.UNKNOWN)); + assertThat(IndexLongFieldRange.UNKNOWN.removeShard(0, 1), sameInstance(IndexLongFieldRange.NO_SHARDS)); + + final IndexLongFieldRange initialRange = randomSpecificRange(); + final int shardCount = initialRange.isComplete() + ? between(1, 5) : Arrays.stream(initialRange.getShards()).max().orElse(0) + between(1, 3); + + final int shard = between(0, shardCount - 1); + final IndexLongFieldRange rangeWithoutShard = initialRange.removeShard(shard, shardCount); + assertFalse(rangeWithoutShard.isComplete()); + assertTrue(Arrays.stream(rangeWithoutShard.getShards()).noneMatch(i -> i == shard)); + if (rangeWithoutShard != IndexLongFieldRange.NO_SHARDS) { + assertThat(rangeWithoutShard.getMinUnsafe(), equalTo(initialRange.getMinUnsafe())); + assertThat(rangeWithoutShard.getMaxUnsafe(), equalTo(initialRange.getMaxUnsafe())); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java new file mode 100644 index 0000000000000..81f5940fcafd2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeWireTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Arrays; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.checkForSameInstances; +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomRange; + +public class IndexLongFieldRangeWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return IndexLongFieldRange::readFrom; + } + + @Override + protected IndexLongFieldRange createTestInstance() { + return randomRange(); + } + + @Override + protected IndexLongFieldRange mutateInstance(IndexLongFieldRange instance) throws IOException { + if (instance == IndexLongFieldRange.UNKNOWN) { + return IndexLongFieldRangeTestUtils.randomSpecificRange(); + } + + if (randomBoolean()) { + return IndexLongFieldRange.UNKNOWN; + } + + while (true) { + final IndexLongFieldRange newInstance = IndexLongFieldRangeTestUtils.randomSpecificRange(); + if (newInstance.getMinUnsafe() != instance.getMinUnsafe() + || newInstance.getMaxUnsafe() != instance.getMaxUnsafe() + || Arrays.equals(newInstance.getShards(), instance.getShards()) == false) { + return newInstance; + } + } + } + + + @Override + protected void assertEqualInstances(IndexLongFieldRange expectedInstance, IndexLongFieldRange newInstance) { + if (checkForSameInstances(expectedInstance, newInstance) == false) { + super.assertEqualInstances(expectedInstance, newInstance); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java new file mode 100644 index 0000000000000..f48346950e6bc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexLongFieldRangeXContentTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.checkForSameInstances; +import static org.elasticsearch.index.shard.IndexLongFieldRangeTestUtils.randomRange; +import static org.hamcrest.Matchers.sameInstance; + +public class IndexLongFieldRangeXContentTests extends AbstractXContentTestCase { + @Override + protected IndexLongFieldRange createTestInstance() { + return randomRange(); + } + + @Override + protected IndexLongFieldRange doParseInstance(XContentParser parser) throws IOException { + assertThat(parser.nextToken(), sameInstance(XContentParser.Token.START_OBJECT)); + return IndexLongFieldRange.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected void assertEqualInstances(IndexLongFieldRange expectedInstance, IndexLongFieldRange newInstance) { + if (checkForSameInstances(expectedInstance, newInstance) == false) { + super.assertEqualInstances(expectedInstance, newInstance); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java new file mode 100644 index 0000000000000..ffd523c4c2377 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardLongFieldRangeWireTests.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class ShardLongFieldRangeWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return ShardLongFieldRange::readFrom; + } + + @Override + protected ShardLongFieldRange createTestInstance() { + return randomRange(); + } + + public static ShardLongFieldRange randomRange() { + switch (between(1, 3)) { + case 1: + return ShardLongFieldRange.UNKNOWN; + case 2: + return ShardLongFieldRange.EMPTY; + case 3: + return randomSpecificRange(); + default: + throw new AssertionError("impossible"); + } + } + + static ShardLongFieldRange randomSpecificRange() { + final long min = randomLong(); + return ShardLongFieldRange.of(min, randomLongBetween(min, Long.MAX_VALUE)); + } + + @Override + protected ShardLongFieldRange mutateInstance(ShardLongFieldRange instance) throws IOException { + if (instance == ShardLongFieldRange.UNKNOWN) { + return randomBoolean() ? ShardLongFieldRange.EMPTY : randomSpecificRange(); + } + if (instance == ShardLongFieldRange.EMPTY) { + return randomBoolean() ? ShardLongFieldRange.UNKNOWN : randomSpecificRange(); + } + + switch (between(1, 4)) { + case 1: + return ShardLongFieldRange.UNKNOWN; + case 2: + return ShardLongFieldRange.EMPTY; + case 3: + return instance.getMin() == Long.MAX_VALUE + ? randomSpecificRange() + : ShardLongFieldRange.of(randomValueOtherThan(instance.getMin(), + () -> randomLongBetween(Long.MIN_VALUE, instance.getMax())), instance.getMax()); + case 4: + return instance.getMax() == Long.MIN_VALUE + ? randomSpecificRange() + : ShardLongFieldRange.of(instance.getMin(), randomValueOtherThan(instance.getMax(), + () -> randomLongBetween(instance.getMin(), Long.MAX_VALUE))); + default: + throw new AssertionError("impossible"); + } + } + + @Override + protected void assertEqualInstances(ShardLongFieldRange expectedInstance, ShardLongFieldRange newInstance) { + if (expectedInstance == ShardLongFieldRange.UNKNOWN || expectedInstance == ShardLongFieldRange.EMPTY) { + assertSame(expectedInstance, newInstance); + } else { + super.assertEqualInstances(expectedInstance, newInstance); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 5e70f3fd735ae..70b1bae30dcbc 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -407,5 +408,11 @@ public void updateTerm(long newTerm) { } this.term = newTerm; } + + @Override + public ShardLongFieldRange getTimestampMillisRange() { + return ShardLongFieldRange.EMPTY; + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index b1fa969dde889..8e867859a1ce8 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -89,6 +89,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.indices.SystemIndices; @@ -311,7 +312,12 @@ public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { return runTasks(shardStartedClusterStateTaskExecutor, clusterState, startedShards.entrySet().stream() - .map(e -> new StartedShardEntry(e.getKey().shardId(), e.getKey().allocationId().getId(), e.getValue(), "shard started")) + .map(e -> new StartedShardEntry( + e.getKey().shardId(), + e.getKey().allocationId().getId(), + e.getValue(), + "shard started", + ShardLongFieldRange.UNKNOWN)) .collect(Collectors.toList())); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index ba8a8f2b1c6e5..4e0b87a389769 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; @@ -451,7 +452,7 @@ public long addDocument(Iterable doc) throws IOExcepti expectThrows(Exception.class, () -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { throw new AssertionError("recovery must fail"); } diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index a3b908fae5c9d..b58bd3fbf6396 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveriesCollection; @@ -41,7 +42,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { static final PeerRecoveryTargetService.RecoveryListener listener = new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { } @@ -76,7 +77,7 @@ public void testRecoveryTimeout() throws Exception { final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { latch.countDown(); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index bdc101977afc0..a0faed79d6dbc 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm) + .timestampMillisRange(primaryState == ShardRoutingState.STARTED || primaryState == ShardRoutingState.RELOCATING + ? IndexLongFieldRange.UNKNOWN : IndexLongFieldRange.NO_SHARDS) .build(); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); @@ -300,6 +303,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice IndexMetadata indexMetadata = IndexMetadata.builder(index) .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT).put(SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).put(SETTING_CREATION_DATE, System.currentTimeMillis())) + .timestampMillisRange(IndexLongFieldRange.UNKNOWN) .build(); metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 54f46f2e7ec03..979020ddd2640 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -123,7 +123,7 @@ public abstract class IndexShardTestCase extends ESTestCase { protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onRecoveryDone(RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) { } diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java new file mode 100644 index 0000000000000..4143fe08f64aa --- /dev/null +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.frozen.FrozenIndices; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0, transportClientRatio = 0) +public class FrozenIndexIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return org.elasticsearch.common.collect.List.of(FrozenIndices.class, LocalStateCompositeXPackPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOException { + final List nodeNames = internalCluster().startNodes(2); + + createIndex("index", Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + final IndexResponse indexResponse = client().prepareIndex("index", "_doc") + .setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, "2010-01-06T02:03:04.567Z").get(); + + ensureGreen("index"); + + assertThat(client().admin().indices().prepareFlush("index").get().getSuccessfulShards(), equalTo(2)); + assertThat(client().admin().indices().prepareRefresh("index").get().getSuccessfulShards(), equalTo(2)); + + final String excludeSetting = INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(); + assertAcked(client().admin().indices().prepareUpdateSettings("index").setSettings( + Settings.builder().put(excludeSetting, nodeNames.get(0)))); + assertAcked(client().admin().cluster().prepareReroute().add(new CancelAllocationCommand("index", 0, nodeNames.get(0), true))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(1)); + + assertThat(client().prepareDelete("index", "_doc", indexResponse.getId()).get().status(), equalTo(RestStatus.OK)); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, + new FreezeRequest("index").waitForActiveShards(ActiveShardCount.ONE)).actionGet()); + + assertThat(client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.EMPTY)); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(1))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(2)); + assertAcked(client().admin().indices().prepareUpdateSettings("index") + .setSettings(Settings.builder().putNull(excludeSetting))); + assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(2)); + + assertAcked(client().admin().cluster().prepareReroute().add( + new AllocateStalePrimaryAllocationCommand("index", 0, nodeNames.get(0), true))); + + ensureYellowAndNoInitializingShards("index"); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + } + +} diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index f54acf87321e9..5a4e8dcf63a90 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -8,17 +8,15 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.search.builder.PointInTimeBuilder; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; -import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.common.Strings; @@ -30,6 +28,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.IndicesService; @@ -38,16 +37,22 @@ import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse; import org.elasticsearch.xpack.core.XPackClient; import org.elasticsearch.xpack.frozen.FrozenIndices; import org.hamcrest.Matchers; +import org.joda.time.Instant; import java.io.IOException; import java.util.Arrays; @@ -62,13 +67,15 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class FrozenIndexTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(FrozenIndices.class, XPackPlugin.class); + return pluginList(FrozenIndices.class, LocalStateCompositeXPackPlugin.class); } String openReaders(TimeValue keepAlive, String... indices) { @@ -190,10 +197,13 @@ public void testSearchAndGetAPIsAreThrottled() throws Exception { } public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedException { - createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); - client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); - client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + final IndexService originalIndexService = createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); + assertThat(originalIndexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); + + client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index", "_doc", "3").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + if (randomBoolean()) { // sometimes close it assertAcked(client().admin().indices().prepareClose("index").get()); @@ -207,6 +217,7 @@ public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedExcept assertTrue(indexService.getIndexSettings().isSearchThrottled()); IndexShard shard = indexService.getShard(0); assertEquals(0, shard.refreshStats().getTotal()); + assertThat(indexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); } assertAcked(xPackClient.freeze(new FreezeRequest("index").setFreeze(false))); { @@ -217,6 +228,7 @@ public void testFreezeAndUnfreeze() throws ExecutionException, InterruptedExcept IndexShard shard = indexService.getShard(0); Engine engine = IndexShardTestCase.getEngine(shard); assertThat(engine, Matchers.instanceOf(InternalEngine.class)); + assertThat(indexService.getMetadata().getTimestampMillisRange(), sameInstance(IndexLongFieldRange.UNKNOWN)); } client().prepareIndex("index", "_doc", "4").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); } @@ -333,7 +345,7 @@ public void testCanMatch() throws IOException, ExecutionException, InterruptedEx new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); - assertEquals(0, response.getTotal().refresh.getTotal()); // never opened a reader + assertEquals(0, response.getTotal().refresh.getTotal()); } } @@ -484,4 +496,61 @@ public void testTranslogStats() throws Exception { equalTo(indexService.getIndexSettings().isSoftDeleteEnabled() ? 0 : nbDocs)); assertThat(stats.getIndex(indexName).getPrimaries().getTranslog().getUncommittedOperations(), equalTo(0)); } + + public void testComputesTimestampRangeFromMilliseconds() { + final int shardCount = between(1, 3); + createIndex("index", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount).build()); + final String timestampField = DataStream.TimestampField.FIXED_TIMESTAMP_FIELD; + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-05T01:02:03.456Z").get(); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-06T02:03:04.567Z").get(); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis())); + + for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { + assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", + shardStats.getStats().refresh.getTotal(), greaterThanOrEqualTo(1L)); + } + } + + public void testComputesTimestampRangeFromNanoseconds() throws IOException { + + final String timestampField = DataStream.TimestampField.FIXED_TIMESTAMP_FIELD; + final XContentBuilder mapping = XContentFactory.jsonBuilder().startObject() + .startObject("properties") + .startObject(timestampField) + .field("type", "date_nanos") + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject(); + + final int shardCount = between(1, 3); + createIndex("index", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount).build(), "_doc", mapping); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-05T01:02:03.456789012Z").get(); + client().prepareIndex("index", "_doc").setSource(timestampField, "2010-01-06T02:03:04.567890123Z").get(); + + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); + + final IndexLongFieldRange timestampFieldRange + = client().admin().cluster().prepareState().get().getState().metadata().index("index").getTimestampMillisRange(); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + assertThat(timestampFieldRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertTrue(timestampFieldRange.isComplete()); + assertThat(timestampFieldRange.getMin(), equalTo(Instant.parse("2010-01-05T01:02:03.456Z").getMillis())); + assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.568Z").getMillis())); + + for (ShardStats shardStats : client().admin().indices().prepareStats("index").clear().setRefresh(true).get().getShards()) { + assertThat("shard " + shardStats.getShardRouting() + " refreshed to get the timestamp range", + shardStats.getStats().refresh.getTotal(), greaterThanOrEqualTo(1L)); + } + } + } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 001993c283001..1209842a697ac 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -28,11 +29,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; @@ -49,6 +52,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.hamcrest.Matchers; +import org.joda.time.Instant; import java.io.IOException; import java.nio.file.Files; @@ -83,6 +87,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -132,6 +138,21 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { assertShardFolders(indexName, false); + assertThat( + client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName) + .getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.UNKNOWN) + ); + final boolean deletedBeforeMount = randomBoolean(); if (deletedBeforeMount) { assertAcked(client().admin().indices().prepareDelete(indexName)); @@ -214,6 +235,21 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { ensureGreen(restoredIndexName); assertShardFolders(restoredIndexName, true); + assertThat( + client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(restoredIndexName) + .get() + .getState() + .metadata() + .index(restoredIndexName) + .getTimestampMillisRange(), + sameInstance(IndexLongFieldRange.UNKNOWN) + ); + if (deletedBeforeMount) { assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(0)); assertAcked(client().admin().indices().prepareAliases().addAlias(restoredIndexName, aliasName)); @@ -668,6 +704,89 @@ public void testSnapshotMountedIndexLeavesBlobsUntouched() throws Exception { assertThat(snapshotTwoStatus.getStats().getProcessedFileCount(), equalTo(numShards)); // one segment_N per shard } + public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMetadata() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int numShards = between(1, 3); + + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .addMapping( + "_doc", + XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD) + .field("type", "date_nanos") + .field("format", "strict_date_optional_time_nanos") + .endObject() + .endObject() + .endObject() + ) + .setSettings(indexSettingsNoReplicas(numShards).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)) + ); + ensureGreen(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + final int docCount = between(0, 1000); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add( + client().prepareIndex(indexName, "_doc") + .setSource( + DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, + String.format( + Locale.ROOT, + "2020-11-26T%02d:%02d:%02d.%09dZ", + between(0, 23), + between(0, 59), + between(0, 59), + randomLongBetween(0, 999999999L) + ) + ) + ); + } + indexRandom(true, false, indexRequestBuilders); + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "fs"); + + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", org.elasticsearch.common.collect.List.of(indexName)) + .snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, Settings.EMPTY); + ensureGreen(indexName); + + final IndexLongFieldRange timestampMillisRange = client().admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setIndices(indexName) + .get() + .getState() + .metadata() + .index(indexName) + .getTimestampMillisRange(); + + assertTrue(timestampMillisRange.isComplete()); + assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.UNKNOWN))); + if (docCount == 0) { + assertThat(timestampMillisRange, sameInstance(IndexLongFieldRange.EMPTY)); + } else { + assertThat(timestampMillisRange, not(sameInstance(IndexLongFieldRange.EMPTY))); + assertThat(timestampMillisRange.getMin(), greaterThanOrEqualTo(Instant.parse("2020-11-26T00:00:00Z").getMillis())); + assertThat(timestampMillisRange.getMin(), lessThanOrEqualTo(Instant.parse("2020-11-27T00:00:00Z").getMillis())); + } + } + private void assertTotalHits(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception { final Thread[] threads = new Thread[between(1, 5)]; final AtomicArray allHits = new AtomicArray<>(threads.length);