Skip to content

Commit

Permalink
SNAPSHOT: Introduce ClusterState Tests
Browse files Browse the repository at this point in the history
* Bringing in cluster state test infrastructure
* Relates elastic#32265
  • Loading branch information
original-brownbear committed Nov 23, 2018
1 parent fb09f20 commit 0c5b3e5
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private static void validate(final String repositoryName, final String snapshotN
* @param partial allow partial snapshots
* @param userCreateSnapshotListener listener
*/
private void beginSnapshot(final ClusterState clusterState,
public void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
final CreateSnapshotListener userCreateSnapshotListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand All @@ -47,6 +56,7 @@
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
Expand Down Expand Up @@ -82,6 +92,10 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
Expand All @@ -91,8 +105,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
Expand All @@ -109,6 +127,7 @@
public class ClusterStateChanges {
private static final Settings SETTINGS = Settings.builder()
.put(PATH_HOME_SETTING.getKey(), "dummy")
.put(Environment.PATH_REPO_SETTING.getKey(), LuceneTestCase.createTempDir("repos"))
.build();

private static final Logger logger = LogManager.getLogger(ClusterStateChanges.class);
Expand All @@ -128,6 +147,12 @@ public class ClusterStateChanges {
private final ZenDiscovery.NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final NodeJoinController.JoinTaskExecutor joinTaskExecutor;

private final SnapshotsService snapshotsService;
private final TransportPutRepositoryAction transportPutRepositoryAction;
private final TransportDeleteRepositoryAction transportDeleteRepositoryAction;
private final TransportCreateSnapshotAction transportCreateSnapshotAction;
private final TransportDeleteSnapshotAction transportDeleteSnapshotAction;

public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
allocationService = new AllocationService(new AllocationDeciders(
Expand Down Expand Up @@ -206,6 +231,31 @@ allocationService, new AliasValidator(), environment,
nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService,
s -> { throw new AssertionError("rejoin not implemented"); }, logger);
joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger);

Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, environment, xContentRegistry) {
@Override
protected void assertSnapshotOrGenericThread() {
// we don't execute on the correct thread here
}
});
RepositoriesService repositoriesService = new RepositoriesService(SETTINGS, clusterService, transportService, factories,
threadPool);
transportPutRepositoryAction = new TransportPutRepositoryAction(transportService, clusterService, repositoriesService,
threadPool, actionFilters, indexNameExpressionResolver);
transportDeleteRepositoryAction = new TransportDeleteRepositoryAction(transportService, clusterService,
repositoriesService, threadPool, actionFilters, indexNameExpressionResolver);
snapshotsService = new SnapshotsService(SETTINGS, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
transportCreateSnapshotAction = new TransportCreateSnapshotAction(
transportService, clusterService, threadPool, snapshotsService, actionFilters, indexNameExpressionResolver
);
transportDeleteSnapshotAction = new TransportDeleteSnapshotAction(
transportService, clusterService, threadPool, snapshotsService, actionFilters, indexNameExpressionResolver
);
}

public Settings getSettings() {
return SETTINGS;
}

public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {
Expand Down Expand Up @@ -265,6 +315,35 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries);
}

public ClusterState putRepository(ClusterState state, PutRepositoryRequest request) {
return execute(transportPutRepositoryAction, request, state);
}

public ClusterState deleteRepository(ClusterState state, DeleteRepositoryRequest request) {
return execute(transportDeleteRepositoryAction, request, state);
}

public ClusterState createSnapshot(ClusterState state, CreateSnapshotRequest request) {
return execute(transportCreateSnapshotAction, request, state);
}

public ClusterState deleteSnapshot(ClusterState state, DeleteSnapshotRequest request) {
return execute(transportDeleteSnapshotAction, request, state);
}

public ClusterState beginSnapshot(ClusterState state, SnapshotsInProgress.Entry snapshot, boolean partial) {
return executeClusterStateUpdateTask(state, () ->
snapshotsService.beginSnapshot(state, snapshot, partial, new SnapshotsService.CreateSnapshotListener() {
@Override
public void onResponse() {
}

@Override
public void onFailure(Exception e) {
}
}));
}

private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) {
try {
ClusterTasksResult<T> result = executor.execute(clusterState, entries);
Expand All @@ -291,14 +370,21 @@ private <Request extends MasterNodeRequest<Request>, Response extends ActionResp
}

private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) {
ClusterState[] result = new ClusterState[1];
when(clusterService.state()).thenReturn(state);
CompletableFuture<ClusterState> clusterStateResult = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
ClusterStateUpdateTask task = (ClusterStateUpdateTask)invocationOnMock.getArguments()[1];
result[0] = task.execute(state);
clusterStateResult.complete(task.execute(state));
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
runnable.run();
assertThat(result[0], notNullValue());
return result[0];
final ClusterState result;
try {
result = clusterStateResult.get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError(e);
}
assertThat(result, notNullValue());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
Expand All @@ -31,6 +34,7 @@
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -43,10 +47,13 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -99,6 +106,33 @@ public void tearDown() throws Exception {
terminate(threadPool);
}

public void testRepository() {
ClusterState state = ClusterStateCreationUtils.state(3, new String[]{"test-index"}, randomIntBetween(1, 5));
state = cluster.putRepository(
state,
new PutRepositoryRequest("test-repo").type("fs").verify(false)
.settings(
"{\"location\": \"" + Environment.PATH_REPO_SETTING.get(cluster.getSettings()).get(0) + "\"}",
XContentType.JSON
)
);
state = cluster.createSnapshot(state, new CreateSnapshotRequest("test-repo", "test-snap"));
assertNotNull(state.custom(SnapshotsInProgress.TYPE));
SnapshotsInProgress.Entry firstEntry = ((SnapshotsInProgress) state.custom(SnapshotsInProgress.TYPE)).entries().get(0);
assertEquals("test-snap", firstEntry.snapshot().getSnapshotId().getName());
assertEquals(SnapshotsInProgress.State.INIT, firstEntry.state());
state = cluster.beginSnapshot(state, firstEntry, false);
SnapshotsInProgress.Entry secondEntry = ((SnapshotsInProgress) state.custom(SnapshotsInProgress.TYPE)).entries().get(0);
assertEquals(Strings.toString(state, true, true), SnapshotsInProgress.State.STARTED, secondEntry.state());
final ClusterState finalState = state;
expectThrows(RuntimeException.class, () -> cluster.deleteIndices(finalState, new DeleteIndexRequest("test-index")));
state = cluster.deleteSnapshot(state, new DeleteSnapshotRequest("test-repo", "test-snap"));
assertNotNull(state.custom(SnapshotsInProgress.TYPE));
// TODO: This currently leaves one entry that is marked aborted behind
// assertEquals(0, ((SnapshotsInProgress) state.custom(SnapshotsInProgress.TYPE)).entries().size());
// cluster.deleteRepository(state, new DeleteRepositoryRequest("test-repo"));
}

public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
Expand Down

0 comments on commit 0c5b3e5

Please sign in to comment.