Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace InMemoryPersistedState with GatewayMetaState in CoordinatorTests #36897

Merged
merged 10 commits into from
Jan 15, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ static ClusterState removeStateNotRecoveredBlock(final ClusterState state) {
.build();
}

public static ClusterState addStateNotRecoveredBlock(ClusterState state) {
return ClusterState.builder(state)
.blocks(ClusterBlocks.builder()
.blocks(state.blocks()).addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build())
.build();
}

static ClusterState mixCurrentStateAndRecoveredState(final ClusterState currentState, final ClusterState recoveredState) {
assert currentState.metaData().indices().isEmpty();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
Expand Down Expand Up @@ -67,8 +66,6 @@
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

/**
* This class is responsible for storing/retrieving metadata to/from disk.
* When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs
Expand Down Expand Up @@ -129,11 +126,9 @@ private void initializeClusterState(ClusterName clusterName) throws IOException
previousManifest = manifestAndMetaData.v1();

final MetaData metaData = manifestAndMetaData.v2();
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK);

previousClusterState = ClusterState.builder(clusterName)
.version(previousManifest.getClusterStateVersion())
.blocks(blocks.build())
.metaData(metaData).build();

logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
Expand All @@ -144,6 +139,7 @@ public void applyClusterStateUpdaters() {
assert transportService.getLocalNode() != null : "transport service is not yet started";

previousClusterState = Function.<ClusterState>identity()
.andThen(ClusterStateUpdaters::addStateNotRecoveredBlock)
.andThen(state -> ClusterStateUpdaters.setLocalNode(state, transportService.getLocalNode()))
.andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings()))
.andThen(state -> ClusterStateUpdaters.closeBadIndices(state, indicesService))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -123,6 +126,16 @@

public class CoordinatorTests extends ESTestCase {

private final List<NodeEnvironment> nodeEnvironments = new ArrayList<>();

@After
public void closeNodeEnvironmentsAfterEachTest() {
for (NodeEnvironment nodeEnvironment : nodeEnvironments) {
nodeEnvironment.close();
}
nodeEnvironments.clear();
}

@Before
public void resetPortCounterBeforeEachTest() {
resetPortCounter();
Expand Down Expand Up @@ -1358,35 +1371,50 @@ ClusterNode getAnyNodePreferringLeaders() {
return getAnyNode();
}

class MockPersistedState extends InMemoryPersistedState {
MockPersistedState(long term, ClusterState acceptedState) {
super(term, acceptedState);
class MockPersistedState implements PersistedState {
private final PersistedState delegate;

MockPersistedState(Settings settings, DiscoveryNode localNode) throws IOException {
if (rarely()) {
NodeEnvironment nodeEnvironment = newNodeEnvironment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: too much indent here?

nodeEnvironments.add(nodeEnvironment);
delegate = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode)
.getPersistedState(settings, null);
} else {
delegate = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
}
}

private void possiblyFail(String description) {
if (disruptStorage && rarely()) {
// TODO revisit this when we've decided how PersistedState should throw exceptions
logger.trace("simulating IO exception [{}]", description);
if (randomBoolean()) {
throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
} else {
throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']');
}
// In the real-life IOError might be thrown, for example if state fsync fails.
// This will require node restart and we're not emulating it here.
throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
}
}

@Override
public long getCurrentTerm() {
return delegate.getCurrentTerm();
}

@Override
public ClusterState getLastAcceptedState() {
return delegate.getLastAcceptedState();
}

@Override
public void setCurrentTerm(long currentTerm) {
possiblyFail("before writing term of " + currentTerm);
super.setCurrentTerm(currentTerm);
// TODO possiblyFail() here if that's a failure mode of the storage layer
delegate.setCurrentTerm(currentTerm);
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
super.setLastAcceptedState(clusterState);
// TODO possiblyFail() here if that's a failure mode of the storage layer
delegate.setLastAcceptedState(clusterState);
}
}

Expand All @@ -1396,7 +1424,7 @@ class ClusterNode {
private final int nodeIndex;
private Coordinator coordinator;
private DiscoveryNode localNode;
private final PersistedState persistedState;
private PersistedState persistedState;
private FakeClusterApplier clusterApplier;
private AckedFakeThreadPoolMasterService masterService;
private TransportService transportService;
Expand All @@ -1406,8 +1434,6 @@ class ClusterNode {
ClusterNode(int nodeIndex, boolean masterEligible) {
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode(masterEligible);
persistedState = new MockPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
onNode(localNode, this::setUp).run();
}

Expand Down Expand Up @@ -1480,6 +1506,12 @@ protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNo
Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
masterService.setClusterStatePublisher(coordinator);

try {
persistedState = new MockPersistedState(settings, localNode);
} catch (IOException e) {
fail("Unable to create MockPersistedState");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would like to see the caught exception here:

Suggested change
fail("Unable to create MockPersistedState");
throw new AssertionError("Unable to create MockPersistedState", e);

}

transportService.start();
transportService.acceptIncomingRequests();
masterService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.stream.Stream;

import static org.elasticsearch.cluster.metadata.MetaData.CLUSTER_READ_ONLY_BLOCK;
import static org.elasticsearch.gateway.ClusterStateUpdaters.addStateNotRecoveredBlock;
import static org.elasticsearch.gateway.ClusterStateUpdaters.closeBadIndices;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.ClusterStateUpdaters.mixCurrentStateAndRecoveredState;
Expand Down Expand Up @@ -182,6 +183,24 @@ public void testRemoveStateNotRecoveredBlock() {
assertFalse(newState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK));
}

public void testAddStateNotRecoveredBlock() {
final MetaData.Builder metaDataBuilder = MetaData.builder()
.persistentSettings(Settings.builder().put("test", "test").build());
final IndexMetaData indexMetaData = createIndexMetaData("test", Settings.EMPTY);
metaDataBuilder.put(indexMetaData, false);

final ClusterState initialState = ClusterState
.builder(ClusterState.EMPTY_STATE)
.metaData(metaDataBuilder)
.build();
assertFalse(initialState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK));

final ClusterState newState = addStateNotRecoveredBlock(initialState);

assertMetaDataEquals(initialState, newState);
assertTrue(newState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK));
}

public void testCloseBadIndices() throws IOException {
final IndicesService indicesService = mock(IndicesService.class);
final IndexMetaData good = createIndexMetaData("good", Settings.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,49 +27,20 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;

public class GatewayMetaStatePersistedStateTests extends ESTestCase {
private class GatewayMetaStateUT extends GatewayMetaState {
private final DiscoveryNode localNode;

GatewayMetaStateUT(Settings settings, NodeEnvironment nodeEnvironment, DiscoveryNode localNode) throws IOException {
super(settings, nodeEnvironment, new MetaStateService(nodeEnvironment, xContentRegistry()),
mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class),
mock(TransportService.class), mock(ClusterService.class),
mock(IndicesService.class));
this.localNode = localNode;
}

@Override
protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
// MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier
}

@Override
public void applyClusterStateUpdaters() {
// Just set localNode here, not to mess with ClusterService and IndicesService mocking
previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
}
}

private NodeEnvironment nodeEnvironment;
private ClusterName clusterName;
private Settings settings;
Expand All @@ -91,21 +62,21 @@ public void tearDown() throws Exception {
super.tearDown();
}

private GatewayMetaStateUT newGateway() throws IOException {
GatewayMetaStateUT gateway = new GatewayMetaStateUT(settings, nodeEnvironment, localNode);
private MockGatewayMetaState newGateway() throws IOException {
MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode);
gateway.applyClusterStateUpdaters();
return gateway;
}

private GatewayMetaStateUT maybeNew(GatewayMetaStateUT gateway) throws IOException {
private MockGatewayMetaState maybeNew(MockGatewayMetaState gateway) throws IOException {
if (randomBoolean()) {
return newGateway();
}
return gateway;
}

public void testInitialState() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();
ClusterState state = gateway.getLastAcceptedState();
assertThat(state.getClusterName(), equalTo(clusterName));
assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA));
Expand All @@ -117,7 +88,7 @@ public void testInitialState() throws IOException {
}

public void testSetCurrentTerm() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();

for (int i = 0; i < randomIntBetween(1, 5); i++) {
final long currentTerm = randomNonNegativeLong();
Expand Down Expand Up @@ -171,7 +142,7 @@ private void assertClusterStateEqual(ClusterState expected, ClusterState actual)
}

public void testSetLastAcceptedState() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();
final long term = randomNonNegativeLong();

for (int i = 0; i < randomIntBetween(1, 5); i++) {
Expand All @@ -194,7 +165,7 @@ public void testSetLastAcceptedState() throws IOException {
}

public void testSetLastAcceptedStateTermChanged() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();

final String indexName = randomAlphaOfLength(10);
final int numberOfShards = randomIntBetween(1, 5);
Expand All @@ -218,7 +189,7 @@ public void testSetLastAcceptedStateTermChanged() throws IOException {
}

public void testCurrentTermAndTermAreDifferent() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();

long currentTerm = randomNonNegativeLong();
long term = randomValueOtherThan(currentTerm, () -> randomNonNegativeLong());
Expand All @@ -233,7 +204,7 @@ public void testCurrentTermAndTermAreDifferent() throws IOException {
}

public void testMarkAcceptedConfigAsCommitted() throws IOException {
GatewayMetaStateUT gateway = newGateway();
MockGatewayMetaState gateway = newGateway();

//generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetaData coordinationMetaData;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

import static org.mockito.Mockito.mock;

/**
* {@link GatewayMetaState} constructor accepts a lot of arguments.
* It's not always easy / convenient to construct these dependencies.
* This class constructor takes far fewer dependencies and constructs usable {@link GatewayMetaState} with 2 restrictions:
* no metadata upgrade will be performed and no cluster state updaters will be run. This is sufficient for most of the tests.
* Metadata upgrade is tested in {@link GatewayMetaStateTests} and different {@link ClusterStateUpdaters} in
* {@link ClusterStateUpdatersTests}.
*/
public class MockGatewayMetaState extends GatewayMetaState {
private final DiscoveryNode localNode;

public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException {
super(settings, nodeEnvironment, new MetaStateService(nodeEnvironment, xContentRegistry),
mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class),
mock(TransportService.class), mock(ClusterService.class),
mock(IndicesService.class));
this.localNode = localNode;
}

@Override
protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) {
// MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier
}

@Override
public void applyClusterStateUpdaters() {
// Just set localNode here, not to mess with ClusterService and IndicesService mocking
previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode);
}
}