Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow shards of closed indices to be replicated as regular shards #38024

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,16 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
}

logger.debug("closing index {} succeeded", index);
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
closedIndices.add(index.getName());
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
}

logger.info("completed closing of indices {}", closedIndices);
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
}

/**
* Initializes a new empty index, as as a result of closing an opened index.
*/
public Builder initializeAsFromOpenToClose(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, null));
}

/**
* Initializes a new empty index, to be restored from a snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,13 @@ public Builder addAsFromCloseToOpen(IndexMetaData indexMetaData) {
return this;
}

public Builder addAsFromOpenToClose(IndexMetaData indexMetaData) {
assert indexMetaData.getState() == IndexMetaData.State.CLOSE;
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsFromOpenToClose(indexMetaData);
return add(indexRoutingBuilder);
}

public Builder addAsRestore(IndexMetaData indexMetaData, SnapshotRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsRestore(indexMetaData, recoverySource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public enum Reason {
/**
* Forced manually to allocate
*/
MANUAL_ALLOCATION
MANUAL_ALLOCATION,
/**
* Unassigned as a result of closing an index.
*/
INDEX_CLOSED
}

/**
Expand Down Expand Up @@ -269,6 +273,8 @@ public UnassignedInfo(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_beta2) && reason == Reason.MANUAL_ALLOCATION) {
out.writeByte((byte) Reason.ALLOCATION_FAILED.ordinal());
} else if (out.getVersion().before(Version.V_7_0_0) && reason == Reason.INDEX_CLOSED) {
out.writeByte((byte) Reason.REINITIALIZED.ordinal());
} else {
out.writeByte((byte) reason.ordinal());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public NoOpEngine(EngineConfig config) {
protected DirectoryReader open(final IndexCommit commit) throws IOException {
final Directory directory = commit.getDirectory();
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
assert indexCommits.size() == 1 : "expected only one commit point";
tlrx marked this conversation as resolved.
Show resolved Hide resolved
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
final IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
return new DirectoryReader(directory, new LeafReader[0]) {
@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.FAILURE;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.REOPENED;

public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(IndicesClusterStateService.class);
Expand Down Expand Up @@ -240,7 +241,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {

deleteIndices(event); // also deletes shards of deleted indices

removeUnallocatedIndices(event); // also removes shards of removed indices
removeIndices(event); // also removes shards of removed indices

failMissingShards(state);

Expand Down Expand Up @@ -352,17 +353,18 @@ protected void doRun() throws Exception {
}

/**
* Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough
* shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}).
* Removes indices that have no shards allocated to this node or indices whose state has changed. This does not delete the shard data
* as we wait for enough shard copies to exist in the cluster before deleting shard data (triggered by
* {@link org.elasticsearch.indices.store.IndicesStore}).
*
* @param event the cluster changed event
*/
private void removeUnallocatedIndices(final ClusterChangedEvent event) {
private void removeIndices(final ClusterChangedEvent event) {
final ClusterState state = event.state();
final String localNodeId = state.nodes().getLocalNodeId();
assert localNodeId != null;

Set<Index> indicesWithShards = new HashSet<>();
final Set<Index> indicesWithShards = new HashSet<>();
RoutingNode localRoutingNode = state.getRoutingNodes().node(localNodeId);
if (localRoutingNode != null) { // null e.g. if we are not a data node
for (ShardRouting shardRouting : localRoutingNode) {
Expand All @@ -371,20 +373,27 @@ private void removeUnallocatedIndices(final ClusterChangedEvent event) {
}

for (AllocatedIndex<? extends Shard> indexService : indicesService) {
Index index = indexService.index();
if (indicesWithShards.contains(index) == false) {
final Index index = indexService.index();
final IndexMetaData indexMetaData = state.metaData().index(index);
final IndexMetaData existingMetaData = indexService.getIndexSettings().getIndexMetaData();

AllocatedIndices.IndexRemovalReason reason = null;
if (indexMetaData != null && indexMetaData.getState() != existingMetaData.getState()) {
reason = indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : REOPENED;
} else if (indicesWithShards.contains(index) == false) {
// if the cluster change indicates a brand new cluster, we only want
// to remove the in-memory structures for the index and not delete the
// contents on disk because the index will later be re-imported as a
// dangling index
final IndexMetaData indexMetaData = state.metaData().index(index);
assert indexMetaData != null || event.isNewCluster() :
"index " + index + " does not exist in the cluster state, it should either " +
"have been deleted or the cluster must be new";
final AllocatedIndices.IndexRemovalReason reason =
indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for BWC reasons, I think we will need to keep this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're thinking of the search context releasing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear here. I was rather thinking about the case where an older-version master has removed the routing table for a closed index (i.e. old-style closed indices). We still need to handle these here.

logger.debug("{} removing index, [{}]", index, reason);
indicesService.removeIndex(index, reason, "removing index (no shards allocated)");
reason = indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
}

if (reason != null) {
logger.debug("{} removing index ({})", index, reason);
indicesService.removeIndex(index, reason, "removing index (" + reason + ")");
}
}
}
Expand Down Expand Up @@ -595,7 +604,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
ClusterState clusterState) {
final ShardRouting currentRoutingEntry = shard.routingEntry();
assert currentRoutingEntry.isSameAllocation(shardRouting) :
"local shard has a different allocation id but wasn't cleaning by removeShards. "
"local shard has a different allocation id but wasn't cleaned by removeShards. "
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;

final long primaryTerm;
Expand Down Expand Up @@ -730,7 +739,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFail
private void sendFailShard(ShardRouting shardRouting, String message, @Nullable Exception failure, ClusterState state) {
try {
logger.warn(() -> new ParameterizedMessage(
"[{}] marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
"{} marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
failedShardsCache.put(shardRouting.shardId(), shardRouting);
shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER, state);
} catch (Exception inner) {
Expand Down Expand Up @@ -931,7 +940,7 @@ enum IndexRemovalReason {
DELETED,

/**
* The index have been closed. The index should be removed and all associated resources released. Persistent parts of the index
* The index has been closed. The index should be removed and all associated resources released. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
*/
CLOSED,
Expand All @@ -941,7 +950,13 @@ enum IndexRemovalReason {
* Persistent parts of the index like the shards files, state and transaction logs are kept around in the
* case of a disaster recovery.
*/
FAILURE
FAILURE,

/**
* The index has been reopened. The index should be removed and all associated resources released. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
*/
REOPENED,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
// it's fine to keep the contexts open if the index is still "alive"
// unfortunately we don't have a clear way to signal today why an index is closed.
// to release memory and let references to the filesystem go etc.
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED) {
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED || reason == IndexRemovalReason.REOPENED) {
tlrx marked this conversation as resolved.
Show resolved Hide resolved
freeAllContextForIndex(index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ValidationException;
Expand Down Expand Up @@ -210,7 +211,14 @@ public void testAddIndexClosedBlocks() {
for (Index index : indices) {
assertTrue(blockedIndices.containsKey(index));
if (mixedVersions) {
assertIsClosed(index.getName(), updatedState);
assertThat(updatedState.metaData().index(index).getState(), is(IndexMetaData.State.CLOSE));
assertTrue(updatedState.blocks().hasIndexBlock(index.getName(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
assertThat("Index " + index + " must have only 1 block with id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID,
updatedState.blocks().indices().getOrDefault(index.getName(), emptySet()).stream().filter(clusterBlock ->
clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));

final IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
assertThat(indexRoutingTable, nullValue());
} else {
assertHasBlock(index.getName(), updatedState, blockedIndices.get(index));
}
Expand Down Expand Up @@ -346,19 +354,18 @@ private static ClusterState addIndex(final ClusterState currentState,
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
clusterStateBuilder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));

if (state == IndexMetaData.State.OPEN) {
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());

if (block != null) {
clusterStateBuilder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).addIndexBlock(index, block));
}
Expand All @@ -372,11 +379,19 @@ private static void assertIsOpened(final String indexName, final ClusterState cl

private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(indexName), nullValue());
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));

final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
assertThat(indexRoutingTable, notNullValue());

for(IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
assertThat(shardRoutingTable.shards().stream().allMatch(ShardRouting::unassigned), is(true));
assertThat(shardRoutingTable.shards().stream().map(ShardRouting::unassignedInfo).map(UnassignedInfo::getReason)
.allMatch(info -> info == UnassignedInfo.Reason.INDEX_CLOSED), is(true));
}
}

private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -54,6 +55,7 @@
import static org.hamcrest.Matchers.nullValue;

public class UnassignedInfoTests extends ESAllocationTestCase {

public void testReasonOrdinalOrder() {
UnassignedInfo.Reason[] order = new UnassignedInfo.Reason[]{
UnassignedInfo.Reason.INDEX_CREATED,
Expand All @@ -70,7 +72,8 @@ public void testReasonOrdinalOrder() {
UnassignedInfo.Reason.REALLOCATED_REPLICA,
UnassignedInfo.Reason.PRIMARY_FAILED,
UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY,
UnassignedInfo.Reason.MANUAL_ALLOCATION,};
UnassignedInfo.Reason.MANUAL_ALLOCATION,
UnassignedInfo.Reason.INDEX_CLOSED,};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
Expand All @@ -95,6 +98,21 @@ public void testSerialization() throws Exception {
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
}

public void testBwcSerialization() throws Exception {
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, "message");
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
unassignedInfo.writeTo(out);
out.close();

UnassignedInfo read = new UnassignedInfo(out.bytes().streamInput());
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED));
assertThat(read.getUnassignedTimeInMillis(), equalTo(unassignedInfo.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(unassignedInfo.getMessage()));
assertThat(read.getDetails(), equalTo(unassignedInfo.getDetails()));
assertThat(read.getNumFailedAllocations(), equalTo(unassignedInfo.getNumFailedAllocations()));
}

public void testIndexCreated() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT))
Expand Down
Loading