Skip to content

Commit

Permalink
Demonstrate unsafe close
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed Oct 12, 2023
1 parent d210461 commit 5557cb4
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,39 @@ public void testCloseAlreadyClosedIndex() throws Exception {
public void testCloseUnassignedIndex() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(
prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build())
prepareCreate(indexName).setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build())
);

final ClusterState clusterState = clusterAdmin().prepareState().get().getState();
assertThat(clusterState.metadata().indices().get(indexName).getState(), is(IndexMetadata.State.OPEN));
assertThat(clusterState.routingTable().allShards().allMatch(ShardRouting::unassigned), is(true));
final int nbDocs = randomIntBetween(1, 1);
indexRandom(
randomBoolean(),
false,
randomBoolean(),
IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i))
.collect(toList())
);

assertBusy(() -> closeIndices(indicesAdmin().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE)));
internalCluster().restartNode(internalCluster().nodesInclude(indexName).iterator().next(), new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
closeIndices(indicesAdmin().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE));

return super.onNodeStopped(nodeName);
}
});
assertIndexIsClosed(indexName);


String newNode = internalCluster().startDataOnlyNode();
// assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder().put("index.routing.allocation.include._name", newNode)));
// waitForRelocation();

assertAcked(client().admin().indices().prepareOpen(indexName));

assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);

}

public void testConcurrentClose() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.state;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.Locale;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class UnsafeCloseIndexIT extends ESIntegTestCase {
public void testCloseUnassignedIndex() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(
prepareCreate(indexName).setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build())
);

final int nbDocs = randomIntBetween(1, 1);
indexRandom(
randomBoolean(),
false,
randomBoolean(),
IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName).setId(String.valueOf(i)).setSource("num", i))
.collect(toList())
);

internalCluster().restartNode(internalCluster().nodesInclude(indexName).iterator().next(), new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
assertAcked(indicesAdmin().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE));

return super.onNodeStopped(nodeName);
}
});
CloseIndexIT.assertIndexIsClosed(indexName);


String newNode = internalCluster().startDataOnlyNode();
assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder().put("index.routing.allocation.include._name", newNode)));
waitForRelocation();

assertAcked(client().admin().indices().prepareOpen(indexName));

assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,7 @@ public void flushAndClose() throws IOException {
// TODO we might force a flush in the future since we have the write lock already even though recoveries
// are running.
// TODO: We are not waiting for full durability here atm because we are on the cluster state update thread
flush(false, false, ActionListener.noop());
// flush(false, false, ActionListener.noop());
} catch (AlreadyClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2800,7 +2800,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
|| trackedGlobalCheckpointsNeedSync;
// only sync if index is not closed and there is a shard lagging the primary
if (syncNeeded && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN) {
syncGlobalCheckpoints(reason);
// syncGlobalCheckpoints(reason);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ assert calledFromOutsideOrViaTragedyClose()
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
try {
current.sync();
// current.sync();
} finally {
closeFilesIfNoPendingRetentionLocks();
}
Expand Down

0 comments on commit 5557cb4

Please sign in to comment.