Skip to content

Commit

Permalink
adding test for remote-cluster-service
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Aug 30, 2024
1 parent d5ec82d commit cce3ac5
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -84,7 +83,8 @@ public TransportClusterStateAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService) {
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -65,7 +64,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteManifestManager;
Expand Down Expand Up @@ -101,10 +99,6 @@ public abstract class TransportClusterManagerNodeAction<Request extends ClusterM
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;

public void setRemoteClusterStateService(RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

protected RemoteClusterStateService remoteClusterStateService;

private final String executor;
Expand Down Expand Up @@ -427,7 +421,7 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
return appliedState;
}

ClusterState preCommitState = clusterService.commitState();
ClusterState preCommitState = clusterService.preCommitState();
if (preCommitState != null && termVersion.equals(new ClusterStateTermVersion(preCommitState))) {
logger.trace("Using the published state from local, ClusterStateTermVersion {}", termVersion);
return preCommitState;
Expand All @@ -439,16 +433,18 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
termVersion.getTerm(),
termVersion.getVersion()
);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByFileName(appliedState.stateUUID(), manifestFile);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
appliedState.metadata().clusterUUID(),
manifestFile
);
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifest,
appliedState.nodes().getLocalNode().getId(),
true
);

if(clusterStateFromRemote!=null) {
if (clusterStateFromRemote != null) {
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
return clusterStateFromRemote;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -42,7 +40,8 @@ public TransportGetTermVersionAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetTermVersionAction.NAME,
false,
Expand Down Expand Up @@ -77,7 +76,7 @@ protected void clusterManagerOperation(
ClusterState state,
ActionListener<GetTermVersionResponse> listener
) throws Exception {
ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.commitState()));
ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.preCommitState()));
}

private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState;
setStateOnApplier(applierState);
clusterApplier.setPreCommitState(applierState);

if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// cluster-manager node applies the committed state at the end of the publication process, not here.
Expand Down Expand Up @@ -479,10 +479,6 @@ && getCurrentTerm() == ZEN1_BWC_TERM
}
}

private void setStateOnApplier(ClusterState clusterState) {
clusterApplier.setCommitState(clusterState);
}

private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, DiscoveryNode leader, long term) {
if (lastJoin.isPresent() && lastJoin.get().targetMatches(leader) && lastJoin.get().getTerm() == term) {
return lastJoin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,16 +527,6 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
reposToSkip.add(joiningNodeRepoName);
}
}
//if non-or1
//mix of remote-state (enabled and disabled)
//all of them settings

// publishes to all nodes

//commits

//30s still commit []


if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public interface ClusterApplier {
void setInitialState(ClusterState initialState);

/**
* Sets the committed state for the applier.
* @param clusterState state that has been committed by cluster-manager
* Sets the pre-commit state for the applier.
* @param clusterState state that has been committed by coordinator to store
*/
void setCommitState(ClusterState clusterState);
void setPreCommitState(ClusterState clusterState);

/**
* Method to invoke when a new cluster state is available to be applied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final AtomicReference<ClusterState> preApplyState = new AtomicReference<>(); // last committed state which is yet to be applied
private final AtomicReference<ClusterState> preCommitState = new AtomicReference<>(); // last state which is yet to be applied
private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;
Expand Down Expand Up @@ -169,11 +169,6 @@ public void setInitialState(ClusterState initialState) {
state.set(initialState);
}

@Override
public void setCommitState(ClusterState clusterState) {
preApplyState.set(clusterState);
}

@Override
protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Expand Down Expand Up @@ -238,10 +233,6 @@ public ClusterState state() {
return clusterState;
}

public ClusterState commitState() {
return preApplyState.get();
}

/**
* Returns true if the appliedClusterState is not null
*/
Expand Down Expand Up @@ -759,4 +750,18 @@ protected long currentTimeInMillis() {
protected boolean applicationMayFail() {
return false;
}

/**
* Pre-commit State of the cluster-applier
* @return ClusterState
*/
public ClusterState preCommitState() {
return preCommitState.get();
}

@Override
public void setPreCommitState(ClusterState clusterState) {
preCommitState.set(clusterState);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,12 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState commitState() {
return clusterApplierService.commitState();
/**
* The state that is persisted to store but may not be applied to cluster.
* @return ClusterState
*/
public ClusterState preCommitState() {
return clusterApplierService.preCommitState();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ClusterState getState(String clusterName, ClusterMetadataManifest manifes
if (cache != null) {
ClusterStateTermVersion cacheStateTermVersion = new ClusterStateTermVersion(
new ClusterName(clusterName),
cache.stateUUID(),
cache.metadata().clusterUUID(),
cache.term(),
cache.version()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,7 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
throws IOException {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
ClusterStateDiffManifest diff = manifest.getDiffManifest();
boolean includeEphemeral = true;

List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
Expand Down Expand Up @@ -1425,8 +1426,7 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
manifest.getDiffManifest() != null
&& manifest.getDiffManifest().getIndicesRoutingDiffPath() != null
&& !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(),
true
);
includeEphemeral);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);
Metadata.Builder metadataBuilder = Metadata.builder(updatedClusterState.metadata());
// remove the deleted indices from the metadata
Expand Down Expand Up @@ -1459,7 +1459,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

// always includes all the fields of cluster-state (includeEphemeral=true)
assert includeEphemeral == true;
// newState includes all the fields of cluster-state (includeEphemeral=true always)
remoteClusterStateCache.putState(newState);
return newState;
}
Expand Down Expand Up @@ -1664,4 +1665,8 @@ public RemotePersistenceStats getStats() {
return remoteStateStats;
}

RemoteClusterStateCache getRemoteClusterStateCache() {
return remoteClusterStateCache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ public void testGetTransportWithoutMatchingTerm() {
clusterService.state().version() - 1
)
);

{

}
capturingTransport.handleResponse(capturedRequest.requestId, termResp);

assertThat(capturingTransport.capturedRequests().length, equalTo(2));
Expand Down
Loading

0 comments on commit cce3ac5

Please sign in to comment.