Skip to content

Commit

Permalink
Merge branch 'main' into main-simplify-rrt
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
Shailendra Singh committed Sep 2, 2024
2 parents 50c98d0 + b54e867 commit 33535ed
Show file tree
Hide file tree
Showing 21 changed files with 456 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,18 +839,13 @@ public void testCreateSnapshotV2() throws Exception {

String snapshotName2 = "test-create-snapshot2";

// verify even if waitForCompletion is not true, the request executes in a sync manner
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
// verify response status if waitForCompletion is not true
RestStatus createSnapshotResponseStatus = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);
}

public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception {
Expand Down Expand Up @@ -914,6 +909,7 @@ public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -968,6 +964,7 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -1036,6 +1033,7 @@ public void testCreateSnapshotV2WithRedIndex() throws Exception {
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
Expand Down Expand Up @@ -1097,6 +1095,7 @@ public void testCreateSnapshotV2WithIndexingLoad() throws Exception {
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();

SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
Expand Down Expand Up @@ -1203,6 +1202,7 @@ public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
snapshotInfo[0] = createSnapshotResponse.getSnapshotInfo();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -62,7 +58,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.test.OpenSearchIntegTestCase.resolvePath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertRequestBuilderThrows;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -683,16 +679,4 @@ private void assertRepositoryBlocked(Client client, String repo, String existing
containsString("Could not read repository data because the contents of the repository do not match its expected state.")
);
}

private static String resolvePath(IndexId indexId, String shardId) {
PathType pathType = PathType.fromCode(indexId.getShardPathType());
RemoteStorePathStrategy.SnapshotShardPathInput shardPathInput = new RemoteStorePathStrategy.SnapshotShardPathInput.Builder()
.basePath(BlobPath.cleanPath())
.indexUUID(indexId.getId())
.shardId(shardId)
.build();
PathHashAlgorithm pathHashAlgorithm = pathType != PathType.FIXED ? FNV_1A_COMPOSITE_1 : null;
BlobPath blobPath = pathType.path(shardPathInput, pathHashAlgorithm);
return blobPath.buildAsString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,17 @@ public void testRepositoryCreation() throws Exception {
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-1"), notNullValue());
assertThat(findRepository(repositoriesResponse.repositories(), "test-repo-2"), notNullValue());

RepositoryMetadata testRepo1Md = findRepository(repositoriesResponse.repositories(), "test-repo-1");

logger.info("--> check that trying to create a repository with the same settings repeatedly does not update cluster state");
String beforeStateUuid = clusterStateResponse.getState().stateUUID();
createRepository("test-repo-1", "fs", Settings.builder().put("location", location));
assertEquals(beforeStateUuid, client.admin().cluster().prepareState().clear().get().getState().stateUUID());
repositoriesResponse = client.admin().cluster().prepareGetRepositories(randomFrom("_all", "*", "test-repo-*")).get();
RepositoryMetadata testRepo1MdAfterUpdate = findRepository(repositoriesResponse.repositories(), "test-repo-1");

if (testRepo1Md.settings().equals(testRepo1MdAfterUpdate.settings())) {
assertEquals(beforeStateUuid, client.admin().cluster().prepareState().clear().get().getState().stateUUID());
}

logger.info("--> delete repository test-repo-1");
client.admin().cluster().prepareDeleteRepository("test-repo-1").get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -62,6 +63,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.test.OpenSearchIntegTestCase.resolvePath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -200,11 +202,9 @@ public void testExceptionOnMissingShardLevelSnapBlob() throws IOException {
final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap");

logger.info("--> delete shard-level snap-${uuid}.dat file for one shard in this snapshot to simulate concurrent delete");
final String indexRepoId = getRepositoryData("test-repo").resolveIndexId(snapshotInfo.indices().get(0)).getId();
IndexId indexId = getRepositoryData("test-repo").resolveIndexId(snapshotInfo.indices().get(0));
IOUtils.rm(
repoPath.resolve("indices")
.resolve(indexRepoId)
.resolve("0")
repoPath.resolve(resolvePath(indexId, "0"))
.resolve(BlobStoreRepository.SNAPSHOT_PREFIX + snapshotInfo.snapshotId().getUUID() + ".dat")
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ protected void clusterManagerOperation(
) {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());
if (request.waitForCompletion() || isSnapshotV2) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else if (isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,10 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}
Expand Down
Loading

0 comments on commit 33535ed

Please sign in to comment.