Skip to content

Commit

Permalink
Simplify updated customs logic
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Jun 14, 2024
1 parent 79405ed commit 597057a
Show file tree
Hide file tree
Showing 7 changed files with 610 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
Expand All @@ -26,10 +27,9 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
Expand Down Expand Up @@ -121,24 +121,41 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
);
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
}
currentCustoms.remove(entry.getKey());
public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
ClusterState clusterState,
ClusterState previousClusterState,
boolean includeEphemeral,
boolean firstUploadForSplitGlobalMetadata
) {
if (!includeEphemeral) {
// When includeEphemeral is false, we do not want store any custom objects
return DiffableUtils.diff(
Collections.emptyMap(),
Collections.emptyMap(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
for (String custom : currentCustoms) {
updatedCustoms.put(custom, clusterState.customs().get(custom));
if (firstUploadForSplitGlobalMetadata) {
// For first split global metadata upload, we want to upload all customs
return DiffableUtils.diff(
Collections.emptyMap(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
return updatedCustoms;
return DiffableUtils.diff(
previousClusterState.customs(),
clusterState.customs(),
DiffableUtils.getStringKeySerializer(),
NonDiffableValueSerializer.getAbstractInstance()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS;
Expand Down Expand Up @@ -159,6 +161,7 @@ public class RemoteClusterStateService implements Closeable {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -201,6 +204,9 @@ public RemoteClusterStateService(
threadPool
);
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService);
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
}

/**
Expand All @@ -221,15 +227,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
clusterState.metadata().customs(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
true,
true,
true,
true,
true,
true,
clusterState.customs(),
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable())
);
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
Expand Down Expand Up @@ -276,37 +282,29 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
ClusterState clusterState,
ClusterMetadataManifest previousManifest
) throws IOException {
logger.info("WRITING INCREMENTAL STATE");

final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

final Map<String, UploadedMetadataAttribute> customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, Metadata.Custom> customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> clusterStateCustomsToBeDeleted = new HashMap<>(
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, isPublicationEnabled, firstUploadForSplitGlobalMetadata);
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(
clusterState,
previousClusterState,
isPublicationEnabled,
firstUploadForSplitGlobalMetadata
);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
);
final Map<String, ClusterState.Custom> clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms(
clusterState,
previousClusterState
);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
for (final String custom : clusterState.metadata().customs().keySet()) {
// remove all the customs which are present currently
customsToBeDeletedFromRemote.remove(custom);
}
final Map<String, IndexMetadata> indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices());
for (final String custom : clusterState.customs().keySet()) {
// remove all the custom which are present currently
clusterStateCustomsToBeDeleted.remove(custom);
}
int numIndicesUpdated = 0;
int numIndicesUnchanged = 0;
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
Expand Down Expand Up @@ -337,15 +335,14 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName());
}

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
final DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = remoteRoutingTableService
.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable());
List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
final List<IndexRoutingTable> indicesRoutingToUpload = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v));

UploadedMetadataResults uploadedMetadataResults;
// For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files,
// If file is empty and codec is 1 then write global metadata.
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
;
Expand All @@ -355,24 +352,25 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
|| Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
// ToDo: check if these needs to be updated or not
final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata

final boolean updateDiscoveryNodes = isPublicationEnabled
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled && firstUploadForSplitGlobalMetadata
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
toUpload,
prevIndexMetadataByName,
firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload,
customsDiff.getUpserts(),
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
updateDiscoveryNodes,
updateClusterBlocks,
updateTransientSettingsMetadata,
clusterStateCustomsToUpload,
clusterStateCustomsDiff.getUpserts(),
updateHashesOfConsistentSettings,
indicesRoutingToUpload
);
Expand All @@ -382,10 +380,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap);
allUploadedClusterStateCustomsMap.putAll(uploadedMetadataResults.uploadedClusterStateCustomMetadataMap);
// remove the data for removed custom/indices
customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove);
customsDiff.getDeletes().forEach(allUploadedCustomMap::remove);
indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove);
clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove);
clusterStateCustomsDiff.getDeletes().forEach(allUploadedClusterStateCustomsMap::remove);

if (!updateCoordinationMetadata) {
uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata();
Expand All @@ -408,22 +407,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) {
uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings();
}
if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap();
}
if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) {
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap();
}
uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap;
uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = allUploadedClusterStateCustomsMap;
uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values());

List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = new ArrayList<>();
allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(
uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting(
previousManifest,
uploadedMetadataResults.uploadedIndicesRoutingMetadata,
routingTableDiff.getDeletes()
);
uploadedMetadataResults.uploadedIndicesRoutingMetadata = allUploadedIndicesRouting;

final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
Expand All @@ -448,7 +440,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size(),
customsDiff.getUpserts().size(),
indicesRoutingToUpload.size()
);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
Expand All @@ -464,7 +456,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()
);
} else {
logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage);
Expand All @@ -479,7 +471,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
updateCoordinationMetadata,
updateSettingsMetadata,
updateTemplatesMetadata,
customsToUpload.size()
customsDiff.getUpserts().size()
);
}
return manifestDetails;
Expand Down
Loading

0 comments on commit 597057a

Please sign in to comment.