Skip to content

Commit

Permalink
Merge branch 'main' into 4967_prevent_deletion_of_snapshot_backing_index
Browse files Browse the repository at this point in the history
Signed-off-by: Vishal Sarda <[email protected]>
  • Loading branch information
Vishalks committed Nov 4, 2022
2 parents eabebd8 + 082f059 commit cca5ccc
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 70 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- on-boarding of tasks([#4542](https://github.com/opensearch-project/OpenSearch/pull/4542))
- Integs ([4588](https://github.com/opensearch-project/OpenSearch/pull/4588))
- Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069))

- Integration tests for searchable snapshots ([#5048](https://github.com/opensearch-project/OpenSearch/pull/5048))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down Expand Up @@ -130,6 +130,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Refactored BalancedAllocator.Balancer to LocalShardsBalancer ([#4761](https://github.com/opensearch-project/OpenSearch/pull/4761))
- Fail weight update when decommission ongoing and fail decommission when attribute not weighed away ([#4839](https://github.com/opensearch-project/OpenSearch/pull/4839))
- Skip SymbolicLinkPreservingTarIT when running on Windows ([#5023](https://github.com/opensearch-project/OpenSearch/pull/5023))
- Change the output error message back to use OpenSearchException in the cause chain. ([#5081](https://github.com/opensearch-project/OpenSearch/pull/5081))

### Deprecated
### Removed
- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568))
Expand Down Expand Up @@ -207,7 +209,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Backport failures for merge conflicts on CHANGELOG.md file ([#4977](https://github.com/opensearch-project/OpenSearch/pull/4977))
- Remove gradle-check dependency on precommit [#5027](https://github.com/opensearch-project/OpenSearch/pull/5027)
- Fix version check for 2.x release for awareness attribute decommission([#5034](https://github.com/opensearch-project/OpenSearch/pull/5034))

- Fix flaky test ResourceAwareTasksTests on Windows ([#5077](https://github.com/opensearch-project/OpenSearch/pull/5077))
- Length calculation for block based fetching ([#5055](https://github.com/opensearch-project/OpenSearch/pull/5055))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.collect.Map;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -60,109 +61,204 @@ protected Settings.Builder randomRepositorySettings() {
return settings;
}

private Settings.Builder chunkedRepositorySettings() {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
settings.put("chunk_size", 2 << 13, ByteSizeUnit.BYTES);
return settings;
}

/**
* Tests a happy path scenario for searchable snapshots by creating 2 indices,
* taking a snapshot, restoring them as searchable snapshots.
* Ensures availability of sufficient data nodes and search capable nodes.
*/
public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
createIndex(
"test-idx-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, "test-idx-1");
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, "test-idx-2");

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());
takeSnapshot(client, "test-idx-1", "test-idx-2");
deleteIndicesAndEnsureGreen(client, "test-idx-1", "test-idx-2");

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
restoreSnapshotAndEnsureGreen(client);

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
takeSnapshot(client, repositorySettings, indexName);

deleteIndicesAndEnsureGreen(client, indexName);
restoreSnapshotAndEnsureGreen(client);

assertDocCount(restoredIndexName, 1000L);
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can assign remote shards to a node with local shards given it has the
* search role capabilities.
*/
public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);
takeSnapshot(client, indexName);

restoreSnapshotAndEnsureGreen(client);

assertDocCount(restoredIndexName, 100L);
assertDocCount(indexName, 100L);
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
* nodes with search capabilities are added back to the cluster.
*/
public void testSearchableSnapshotAllocationForFailoverAndRecovery() throws Exception {
final int numReplicasIndex = 1;
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);

takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client);
assertDocCount(restoredIndexName, 100L);

logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureYellow(restoredIndexName);
assertDocCount(restoredIndexName, 100L);

logger.info("--> stop the last search node");
internalCluster().stopRandomSearchNode();
ensureRed(restoredIndexName);

logger.info("--> add 3 new search nodes");
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 2);
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);

logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);
}

/**
* Tests the functionality of index write block on a searchable snapshot index.
*/
public void testSearchableSnapshotIndexIsReadOnly() throws Exception {
final String indexName = "test-index";
final String restoredIndexName = indexName + "-copy";
final Client client = client();
createRepository("test-repo", "fs");

createIndexWithDocsAndEnsureGreen(0, 100, indexName);
takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(1);
restoreSnapshotAndEnsureGreen(client);

assertIndexingBlocked(restoredIndexName);
assertIndexSettingChangeBlocked(restoredIndexName);
assertTrue(client.admin().indices().prepareDelete(restoredIndexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(restoredIndexName).execute().actionGet()
);
}

private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();

logger.info("--> snapshot");
indexRandomDocs(indexName, numOfDocs);
ensureGreen();
}

private void takeSnapshot(Client client, String... indices) {
takeSnapshot(client, null, indices);
}

private void takeSnapshot(Client client, Settings.Builder repositorySettings, String... indices) {
logger.info("--> Create a repository");
if (repositorySettings == null) {
createRepository("test-repo", "fs");
} else {
createRepository("test-repo", "fs", repositorySettings);
}
logger.info("--> Take a snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.setIndices(indices)
.get();

MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
}

assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
private void deleteIndicesAndEnsureGreen(Client client, String... indices) {
assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged());
ensureGreen();
}

private void restoreSnapshotAndEnsureGreen(Client client) {
logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();

assertIndexingBlocked(indexName);
assertIndexSettingChangeBlocked(indexName);
assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(indexName).execute().actionGet()
);
}

private void assertIndexingBlocked(String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public static void generateFailureXContent(XContentBuilder builder, Params param
}
t = t.getCause();
}
builder.field(ERROR, ExceptionsHelper.summaryMessage(e));
builder.field(ERROR, ExceptionsHelper.summaryMessage(t != null ? t : e));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.cluster.routing.RecoverySource;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
*/
@Override
void allocateUnassigned() {
unassignIgnoredRemoteShards(allocation);
if (routingNodes.unassigned().isEmpty()) {
logger.debug("No unassigned remote shards found.");
return;
Expand Down Expand Up @@ -273,7 +275,6 @@ MoveDecision decideRebalance(ShardRouting shardRouting) {
*/
public Map<String, UnassignedIndexShards> groupUnassignedShardsByIndex() {
HashMap<String, UnassignedIndexShards> unassignedShardMap = new HashMap<>();
unassignIgnoredRemoteShards(allocation);
for (ShardRouting shard : routingNodes.unassigned().drain()) {
String index = shard.getIndexName();
if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
Expand All @@ -298,7 +299,17 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
for (ShardRouting shard : unassignedShards.drainIgnored()) {
RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation);
if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) {
unassignedShards.add(shard);
ShardRouting unassignedShard = shard;
// Shard when moved to an unassigned state updates the recovery source to be ExistingStoreRecoverySource
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (!RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType())) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}

unassignedShards.add(unassignedShard);
} else {
unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
final long partStart = part * partSize;

final long position = blockStart - partStart;
final long offset = blockEnd - blockStart - partStart;
final long length = blockEnd - blockStart;

BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.position(position)
.length(offset)
.length(length)
.blobName(fileInfo.partName(part))
.directory(directory)
.fileName(blockFileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public Builder position(long position) {
}

public Builder length(long length) {
if (length <= 0) {
throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative");
}
this.length = length;
return this;
}
Expand Down
Loading

0 comments on commit cca5ccc

Please sign in to comment.