Skip to content

Commit

Permalink
[Remote Publication] Add remote download stats (#15291)
Browse files Browse the repository at this point in the history
---------
Signed-off-by: Arpit Bandejiya <[email protected]>

(cherry picked from commit b54e867)
  • Loading branch information
Arpit-Bandejiya committed Sep 2, 2024
1 parent 8a9c167 commit da098ed
Show file tree
Hide file tree
Showing 15 changed files with 416 additions and 122 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.16.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861), [#15205](https://github.com/opensearch-project/OpenSearch/pull/15205))
- OpenJDK Update (July 2024 Patch releases) ([#14998](https://github.com/opensearch-project/OpenSearch/pull/14998))
- Bump `com.microsoft.azure:msal4j` from 1.16.1 to 1.17.0 ([#14995](https://github.com/opensearch-project/OpenSearch/pull/14995), [#15420](https://github.com/opensearch-project/OpenSearch/pull/15420))
- Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997))
- Bump `org.tukaani:xz` from 1.9 to 1.10 ([#15110](https://github.com/opensearch-project/OpenSearch/pull/15110))
- Bump `actions/setup-java` from 1 to 4 ([#15104](https://github.com/opensearch-project/OpenSearch/pull/15104))
- Bump `org.apache.avro:avro` from 1.11.3 to 1.12.0 in /plugins/repository-hdfs ([#15119](https://github.com/opensearch-project/OpenSearch/pull/15119))
- Bump `org.bouncycastle:bcpg-fips` from 1.0.7.1 to 2.0.9 ([#15103](https://github.com/opensearch-project/OpenSearch/pull/15103), [#15299](https://github.com/opensearch-project/OpenSearch/pull/15299))
- Bump `com.azure:azure-core` from 1.49.1 to 1.51.0 ([#15111](https://github.com/opensearch-project/OpenSearch/pull/15111))
Expand Down Expand Up @@ -82,13 +84,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))
- Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126))
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
- Fix incorrect parameter names in MinHash token filter configuration handling ([#15233](https://github.com/opensearch-project/OpenSearch/pull/15233))
- Fix range aggregation optimization ignoring top level queries ([#15287](https://github.com/opensearch-project/OpenSearch/pull/15287))
- Fix indexing error when flat_object field is explicitly null ([#15375](https://github.com/opensearch-project/OpenSearch/pull/15375))
- Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393))
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))
- Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069))

### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.16...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.15...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,10 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -231,69 +239,78 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
} catch (Exception e) {
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
return remoteClusterStateService.getUploadStats();
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RemoteClusterStateCleanupManager(
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
this.remoteStateStats = remoteClusterStateService.getRemoteStateStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.clusterApplierService = clusterService.getClusterApplierService();
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
Expand Down
Loading

0 comments on commit da098ed

Please sign in to comment.