Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record timestamp field range in index metadata #65689

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.action.shard;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -36,7 +37,9 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand All @@ -50,6 +53,9 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand All @@ -67,9 +73,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -519,16 +527,23 @@ public int hashCode() {
public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange,
final ActionListener<Void> listener) {
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
shardStarted(shardRouting, primaryTerm, message, timestampMillisRange, listener, clusterService.state());
}

public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange,
final ActionListener<Void> listener,
final ClusterState currentState) {
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
final StartedShardEntry entry = new StartedShardEntry(
shardRouting.shardId(),
shardRouting.allocationId().getId(),
primaryTerm,
message,
timestampMillisRange);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

Expand Down Expand Up @@ -578,6 +593,7 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
List<StartedShardEntry> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
final Map<Index, IndexLongFieldRange> updatedTimestampRanges = new HashMap<>();
for (StartedShardEntry task : tasks) {
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
Expand Down Expand Up @@ -618,6 +634,22 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(matched);
seenShardRoutings.add(matched);

// expand the timestamp range recorded in the index metadata if needed
final Index index = task.shardId.getIndex();
IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index);
final IndexMetadata indexMetadata = currentState.metadata().index(index);
if (currentTimestampMillisRange == null) {
currentTimestampMillisRange = indexMetadata.getTimestampMillisRange();
}
final IndexLongFieldRange newTimestampMillisRange;
newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange(
task.shardId.id(),
indexMetadata.getNumberOfShards(),
task.timestampMillisRange);
if (newTimestampMillisRange != currentTimestampMillisRange) {
updatedTimestampRanges.put(index, newTimestampMillisRange);
}
}
}
}
Expand All @@ -627,6 +659,19 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
ClusterState maybeUpdatedState = currentState;
try {
maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied);

if (updatedTimestampRanges.isEmpty() == false) {
final Metadata.Builder metadataBuilder = Metadata.builder(maybeUpdatedState.metadata());
for (Map.Entry<Index, IndexLongFieldRange> updatedTimestampRangeEntry : updatedTimestampRanges.entrySet()) {
metadataBuilder.put(IndexMetadata
.builder(metadataBuilder.getSafe(updatedTimestampRangeEntry.getKey()))
.timestampMillisRange(updatedTimestampRangeEntry.getValue()));
}
maybeUpdatedState = ClusterState.builder(maybeUpdatedState).metadata(metadataBuilder).build();
}

assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);

builder.successes(tasksToBeApplied);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to apply started shards {}", shardRoutingsToBeApplied), e);
Expand All @@ -636,6 +681,16 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
return builder.build(maybeUpdatedState);
}

private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
for (ObjectObjectCursor<String, IndexRoutingTable> cursor : clusterState.getRoutingTable().getIndicesRouting()) {
assert cursor.value.allPrimaryShardsActive() == false
|| clusterState.metadata().index(cursor.key).getTimestampMillisRange().isComplete()
: "index [" + cursor.key + "] should have complete timestamp range, but got "
+ clusterState.metadata().index(cursor.key).getTimestampMillisRange() + " for " + cursor.value.prettyPrint();
}
return true;
}

@Override
public void onFailure(String source, Exception e) {
if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) {
Expand All @@ -658,6 +713,7 @@ public static class StartedShardEntry extends TransportRequest {
final String allocationId;
final long primaryTerm;
final String message;
final ShardLongFieldRange timestampMillisRange;

StartedShardEntry(StreamInput in) throws IOException {
super(in);
Expand All @@ -676,13 +732,19 @@ public static class StartedShardEntry extends TransportRequest {
final Exception ex = in.readException();
assert ex == null : "started shard must not have failure [" + ex + "]";
}
this.timestampMillisRange = ShardLongFieldRange.readFrom(in);
}

public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
public StartedShardEntry(final ShardId shardId,
final String allocationId,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampMillisRange) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.timestampMillisRange = timestampMillisRange;
}

@Override
Expand All @@ -699,6 +761,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_3_0)) {
out.writeException(null);
}
timestampMillisRange.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -340,6 +341,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
static final String KEY_ALIASES = "aliases";
static final String KEY_ROLLOVER_INFOS = "rollover_info";
static final String KEY_SYSTEM = "system";
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
public static final String KEY_PRIMARY_TERMS = "primary_terms";

public static final String INDEX_STATE_FILE_PREFIX = "state-";
Expand Down Expand Up @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;

private final IndexLongFieldRange timestampMillisRange;

private IndexMetadata(
final Index index,
final long version,
Expand All @@ -415,7 +419,8 @@ private IndexMetadata(
final int routingPartitionSize,
final ActiveShardCount waitForActiveShards,
final ImmutableOpenMap<String, RolloverInfo> rolloverInfos,
final boolean isSystem) {
final boolean isSystem,
final IndexLongFieldRange timestampMillisRange) {

this.index = index;
this.version = version;
Expand Down Expand Up @@ -448,6 +453,7 @@ private IndexMetadata(
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = rolloverInfos;
this.isSystem = isSystem;
this.timestampMillisRange = timestampMillisRange;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -661,6 +667,10 @@ public DiscoveryNodeFilters excludeFilters() {
return excludeFilters;
}

public IndexLongFieldRange getTimestampMillisRange() {
return timestampMillisRange;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -770,6 +780,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final Diff<ImmutableOpenIntMap<Set<String>>> inSyncAllocationIds;
private final Diff<ImmutableOpenMap<String, RolloverInfo>> rolloverInfos;
private final boolean isSystem;
private final IndexLongFieldRange timestampMillisRange;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand All @@ -788,6 +799,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer());
isSystem = after.isSystem;
timestampMillisRange = after.timestampMillisRange;
}

private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
Expand Down Expand Up @@ -833,6 +845,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
} else {
isSystem = false;
}
timestampMillisRange = IndexLongFieldRange.readFrom(in);
}

@Override
Expand Down Expand Up @@ -862,6 +875,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
out.writeBoolean(isSystem);
}
timestampMillisRange.writeTo(out);
}

@Override
Expand All @@ -881,6 +895,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds));
builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos));
builder.system(part.isSystem);
builder.timestampMillisRange(timestampMillisRange);
return builder.build();
}
}
Expand Down Expand Up @@ -938,6 +953,7 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
builder.system(in.readBoolean());
}
builder.timestampMillisRange(IndexLongFieldRange.readFrom(in));
return builder.build();
}

Expand Down Expand Up @@ -989,6 +1005,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(SYSTEM_INDEX_FLAG_ADDED)) {
out.writeBoolean(isSystem);
}
timestampMillisRange.writeTo(out);
}

public boolean isSystem() {
Expand Down Expand Up @@ -1020,6 +1037,7 @@ public static class Builder {
private final ImmutableOpenMap.Builder<String, RolloverInfo> rolloverInfos;
private Integer routingNumShards;
private boolean isSystem;
private IndexLongFieldRange timestampMillisRange = IndexLongFieldRange.NO_SHARDS;

public Builder(String index) {
this.index = index;
Expand Down Expand Up @@ -1047,6 +1065,7 @@ public Builder(IndexMetadata indexMetadata) {
this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetadata.inSyncAllocationIds);
this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos);
this.isSystem = indexMetadata.isSystem;
this.timestampMillisRange = indexMetadata.timestampMillisRange;
}

public Builder index(String index) {
Expand Down Expand Up @@ -1255,6 +1274,15 @@ public boolean isSystem() {
return isSystem;
}

public Builder timestampMillisRange(IndexLongFieldRange timestampMillisRange) {
this.timestampMillisRange = timestampMillisRange;
return this;
}

public IndexLongFieldRange getTimestampMillisRange() {
return timestampMillisRange;
}

public IndexMetadata build() {
ImmutableOpenMap.Builder<String, AliasMetadata> tmpAliases = aliases;
Settings tmpSettings = settings;
Expand Down Expand Up @@ -1368,7 +1396,8 @@ public IndexMetadata build() {
routingPartitionSize,
waitForActiveShards,
rolloverInfos.build(),
isSystem);
isSystem,
timestampMillisRange);
}

public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down Expand Up @@ -1469,6 +1498,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
builder.endObject();
builder.field(KEY_SYSTEM, indexMetadata.isSystem);

builder.startObject(KEY_TIMESTAMP_RANGE);
indexMetadata.timestampMillisRange.toXContent(builder, params);
builder.endObject();

builder.endObject();
}

Expand Down Expand Up @@ -1549,6 +1582,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
// simply ignored when upgrading from 2.x
assert Version.CURRENT.major <= 5;
parser.skipChildren();
} else if (KEY_TIMESTAMP_RANGE.equals(currentFieldName)) {
builder.timestampMillisRange(IndexLongFieldRange.fromXContent(parser));
} else {
// assume it's custom index metadata
builder.putCustom(currentFieldName, parser.mapStrings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.ShardLimitValidator;
Expand Down Expand Up @@ -769,6 +770,7 @@ static Tuple<ClusterState, Collection<IndexResult>> closeRoutingTable(final Clus
routingTable.remove(index.getName());
} else {
metadata.put(updatedMetadata
.timestampMillisRange(IndexLongFieldRange.NO_SHARDS)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(Settings.builder()
.put(indexMetadata.getSettings())
Expand Down Expand Up @@ -858,6 +860,7 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
.state(IndexMetadata.State.OPEN)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(updatedSettings)
.timestampMillisRange(IndexLongFieldRange.NO_SHARDS)
.build();

// The index might be closed because we couldn't import it due to old incompatible version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ private IndexMetadata.Builder updateInSyncAllocations(RoutingTable newRoutingTab
final String allocationId;
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
indexMetadataBuilder.timestampMillisRange(indexMetadataBuilder.getTimestampMillisRange()
.removeShard(shardId.id(), oldIndexMetadata.getNumberOfShards()));
} else {
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -1980,4 +1981,12 @@ public interface TranslogRecoveryRunner {
public enum HistorySource {
TRANSLOG, INDEX
}

/**
* @return a {@link ShardLongFieldRange} containing the min and max raw values of the given field for this shard if the engine
* guarantees these values never to change, or {@link ShardLongFieldRange#EMPTY} if this field is empty, or
* {@link ShardLongFieldRange#UNKNOWN} if this field's value range may change in future.
*/
public abstract ShardLongFieldRange getRawFieldRange(String field) throws IOException;

}
Loading