Skip to content

Commit

Permalink
using remote cluster-state as fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Aug 26, 2024
1 parent dac6460 commit d87c895
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -80,7 +82,8 @@ public TransportClusterStateAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
Expand All @@ -93,6 +96,7 @@ public TransportClusterStateAction(
indexNameExpressionResolver
);
this.localExecuteSupported = true;
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -63,6 +64,9 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteManifestManager;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -94,6 +98,7 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
protected final TransportService transportService;
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected RemoteClusterStateService remoteClusterStateService;

private final String executor;

Expand Down Expand Up @@ -378,11 +383,15 @@ public void handleResponse(GetTermVersionResponse response) {
response.getClusterStateTermVersion(),
isLatestClusterStatePresentOnLocalNode
);
if (isLatestClusterStatePresentOnLocalNode) {
onLatestLocalState.accept(clusterState);

ClusterState stateFromNode = getStateFromLocalNode(response.getClusterStateTermVersion());
if (stateFromNode != null) {
onLatestLocalState.accept(stateFromNode);
} else {
// fallback to clusterManager
onStaleLocalState.accept(clusterManagerNode, clusterState);
}

}

@Override
Expand All @@ -405,6 +414,37 @@ public GetTermVersionResponse read(StreamInput in) throws IOException {
};
}

public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
ClusterState appliedState = clusterService.state();
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
return appliedState;
}
ClusterState publishState = clusterService.publishState();
if (publishState != null && termVersion.equals(new ClusterStateTermVersion(publishState))) {
return publishState;

Check warning on line 424 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L424

Added line #L424 was not covered by tests
}
if (remoteClusterStateService != null) {
try {
String manifestFile = RemoteManifestManager.getManifestFilePrefixForTermVersion(
termVersion.getTerm(),
termVersion.getVersion()

Check warning on line 430 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L428-L430

Added lines #L428 - L430 were not covered by tests
);
ClusterMetadataManifest clusterMetadataManifestByFileName = remoteClusterStateService
.getClusterMetadataManifestByFileName(appliedState.getClusterName().value(), manifestFile);
ClusterState clusterStateForManifest = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),

Check warning on line 435 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L432-L435

Added lines #L432 - L435 were not covered by tests
clusterMetadataManifestByFileName,
appliedState.nodes().getLocalNode().getId(),

Check warning on line 437 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L437

Added line #L437 was not covered by tests
true
);
return clusterStateForManifest;
} catch (IOException e) {

Check warning on line 441 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L440-L441

Added lines #L440 - L441 were not covered by tests

}
}
return null;
}

private boolean checkForBlock(Request request, ClusterState localClusterState) {
final ClusterBlockException blockException = checkBlock(request, localClusterState);
if (blockException != null) {
Expand Down Expand Up @@ -510,4 +550,9 @@ protected String getMasterActionName(DiscoveryNode node) {
protected boolean localExecuteSupportedByAction() {
return false;
}

public void setRemoteClusterStateService(RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

Check warning on line 556 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L555-L556

Added lines #L555 - L556 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ && getCurrentTerm() == ZEN1_BWC_TERM
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

setStateOnApplier(coordinationState.get().getLastAcceptedState());

if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
Expand All @@ -477,6 +479,14 @@ && getCurrentTerm() == ZEN1_BWC_TERM
}
}

private void setStateOnApplier(ClusterState clusterState) {
ClusterState publishState = hideStateIfNotRecovered(clusterState);
final ClusterState publishClusterState = mode == Mode.CANDIDATE
? clusterStateWithNoClusterManagerBlock(publishState)
: publishState;
clusterApplier.setPublishState(publishClusterState);
}

private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, DiscoveryNode leader, long term) {
if (lastJoin.isPresent() && lastJoin.get().targetMatches(leader) && lastJoin.get().getTerm() == term) {
return lastJoin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public interface ClusterApplier {
*/
void setInitialState(ClusterState initialState);

/**
* Sets the publish state for the applier
* @param clusterState state published by cluster-manager
*/
void setPublishState(ClusterState clusterState);

/**
* Method to invoke when a new cluster state is available to be applied
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();

private final AtomicReference<ClusterState> publishState = new AtomicReference<>(); // last published state
private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;
Expand Down Expand Up @@ -169,6 +169,11 @@ public void setInitialState(ClusterState initialState) {
state.set(initialState);
}

@Override
public void setPublishState(ClusterState clusterState) {
publishState.set(clusterState);
}

@Override
protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Expand Down Expand Up @@ -233,6 +238,10 @@ public ClusterState state() {
return clusterState;
}

public ClusterState publishState() {
return publishState.get();
}

/**
* Returns true if the appliedClusterState is not null
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState publishState() {
return clusterApplierService.publishState();
}

/**
* Adds a high priority applier of updated cluster states.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -114,6 +116,7 @@
*
* @opensearch.internal
*/
@InternalApi
public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);
Expand Down Expand Up @@ -979,6 +982,8 @@ BlobStore getBlobStore() {
return blobStoreRepository.blobStore();
}

AtomicReference<ClusterState> lastDownloadState = new AtomicReference<>();

/**
* Fetch latest ClusterState from remote, including global metadata, index metadata and cluster state version
*
Expand All @@ -996,8 +1001,16 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID)
);
}
ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
clusterUUID,
clusterMetadataManifest.get().getClusterTerm(),
clusterMetadataManifest.get().getStateVersion()
);

return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
ClusterState state = getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
lastDownloadState.set(state);
return state;
}

// package private for testing
Expand Down Expand Up @@ -1311,8 +1324,29 @@ public ClusterState getClusterStateForManifest(
String localNodeId,
boolean includeEphemeral
) throws IOException {

ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
manifest.getClusterUUID(),
manifest.getClusterTerm(),
manifest.getStateVersion()
);
ClusterState lastState = lastDownloadState.get();
if (lastState != null) {
ClusterStateTermVersion lastStateTermVersion = new ClusterStateTermVersion(

Check warning on line 1336 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1336

Added line #L1336 was not covered by tests
new ClusterName(clusterName),
lastState.stateUUID(),
lastState.term(),
lastState.version()

Check warning on line 1340 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1338-L1340

Added lines #L1338 - L1340 were not covered by tests
);
if (clusterStateTermVersion.equals(lastStateTermVersion)) {
return lastState;

Check warning on line 1343 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1343

Added line #L1343 was not covered by tests
}
}

ClusterState retState = null;
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
return readClusterStateInParallel(
retState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
Expand Down Expand Up @@ -1354,9 +1388,23 @@ public ClusterState getClusterStateForManifest(
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(clusterState.metadata().indices());
return ClusterState.builder(clusterState).metadata(mb).build();
retState = ClusterState.builder(clusterState).metadata(mb).build();
}
setLastDownloadState(retState);
return retState;
}

private void setLastDownloadState(final ClusterState newState) {
lastDownloadState.getAndUpdate(oldState -> {
if (oldState == null) {
return newState;
}
if (newState.term() > oldState.term() && newState.version() > oldState.version()) {
return newState;

Check warning on line 1403 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1403

Added line #L1403 was not covered by tests
} else {
return oldState;

Check warning on line 1405 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1405

Added line #L1405 was not covered by tests
}
});
}

public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
Expand Down Expand Up @@ -1437,11 +1485,14 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
indexRoutingTables.remove(indexName);
}

return clusterStateBuilder.stateUUID(manifest.getStateUUID())
final ClusterState newState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

setLastDownloadState(newState);
return newState;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private List<BlobMetadata> getManifestFileNames(String clusterName, String clust
}
}

static String getManifestFilePrefixForTermVersion(long term, long version) {
public static String getManifestFilePrefixForTermVersion(long term, long version) {
return String.join(
DELIMITER,
RemoteClusterMetadataManifest.MANIFEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void setInitialState(ClusterState initialState) {

}

@Override
public void setPublishState(ClusterState clusterState) {

}

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,8 @@ public void onFailure(final Exception e) {
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
indexNameExpressionResolver,
null
)
);
actions.put(
Expand Down

0 comments on commit d87c895

Please sign in to comment.