Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be more resilient to partial network partitions #8720

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@
import org.elasticsearch.monitor.jvm.JvmInfo;

import java.io.IOException;
import java.io.Serializable;

/**
*/
@SuppressWarnings("deprecation")
public class Version implements Serializable {
public class Version {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean removing the Serializable? yeah, because it's not. It depends on org.apache.lucene.util.Version

// The logic for ID is: XXYYZZAA, where XX is major version, YY is minor version, ZZ is revision, and AA is Beta/RC indicator
// AA values below 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
// the (internal) format of the id is there so we can easily do after/before checks on the id

// NOTE: indexes created with 3.6 use this constant for e.g. analysis chain emulation (imperfect)
public static final org.apache.lucene.util.Version LUCENE_3_EMULATION_VERSION = org.apache.lucene.util.Version.LUCENE_4_0_0;

Expand Down Expand Up @@ -423,6 +422,7 @@ public static Version fromId(int id) {

/**
* Return the {@link Version} of Elasticsearch that has been used to create an index given its settings.
*
* @throws ElasticsearchIllegalStateException if the given index settings doesn't contain a value for the key {@value IndexMetaData#SETTING_VERSION_CREATED}
*/
public static Version indexCreated(Settings indexSettings) {
Expand Down Expand Up @@ -485,7 +485,7 @@ public static Version fromString(String version) {
}
return versionFromId;

} catch(NumberFormatException e) {
} catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse version " + version, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -60,19 +58,19 @@ public class ShardStateAction extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final ThreadPool threadPool;
private final RoutingService routingService;

private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue<ShardRoutingEntry> failedShardQueue = ConcurrentCollections.newBlockingQueue();

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, ThreadPool threadPool) {
AllocationService allocationService, RoutingService routingService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.allocationService = allocationService;
this.threadPool = threadPool;
this.routingService = routingService;

transportService.registerHandler(SHARD_STARTED_ACTION_NAME, new ShardStartedTransportHandler());
transportService.registerHandler(SHARD_FAILED_ACTION_NAME, new ShardFailedTransportHandler());
Expand Down Expand Up @@ -104,11 +102,11 @@ private void innerShardFailed(final ShardRouting shardRouting, final String inde
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
}

Expand All @@ -132,18 +130,19 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
}

private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
if (shardRoutingEntry.processed) {
Expand Down Expand Up @@ -191,6 +190,14 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.scheduleReroute();
}
}
});
}

Expand Down
47 changes: 32 additions & 15 deletions src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -113,7 +111,8 @@ public static boolean dataNode(Settings settings) {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeId the nodes unique id.
*
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param version the version of the node.
*/
Expand All @@ -129,11 +128,12 @@ public DiscoveryNode(String nodeId, TransportAddress address, Version version) {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param address the nodes transport address
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
* @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes, Version version) {
this(nodeName, nodeId, NetworkUtils.getLocalHostName(""), NetworkUtils.getLocalHostAddress(""), address, attributes, version);
Expand All @@ -147,13 +147,14 @@ public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, M
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
* </p>
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param hostName the nodes hostname
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param hostName the nodes hostname
* @param hostAddress the nodes host address
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
* @param address the nodes transport address
* @param attributes node attributes
* @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address, Map<String, String> attributes, Version version) {
if (nodeName != null) {
Expand Down Expand Up @@ -340,8 +341,9 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public boolean equals(Object obj) {
if (!(obj instanceof DiscoveryNode))
if (!(obj instanceof DiscoveryNode)) {
return false;
}

DiscoveryNode other = (DiscoveryNode) obj;
return this.nodeId.equals(other.nodeId);
Expand Down Expand Up @@ -372,4 +374,19 @@ public String toString() {
}
return sb.toString();
}

// we need this custom serialization logic because Version is not serializable (because org.apache.lucene.util.Version is not serializable)
private void writeObject(java.io.ObjectOutputStream out)
throws IOException {
StreamOutput streamOutput = new OutputStreamStreamOutput(out);
streamOutput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
this.writeTo(streamOutput);
}

private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
StreamInput streamInput = new InputStreamStreamInput(in);
streamInput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
this.readFrom(streamInput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ protected void doClose() throws ElasticsearchException {
clusterService.remove(this);
}

/** make sure that a reroute will be done by the next scheduled check */
public void scheduleReroute() {
routingTableDirty = true;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
Expand Down Expand Up @@ -153,8 +158,12 @@ public void onNoLongerMaster(String source) {

@Override
public void onFailure(String source, Throwable t) {
ClusterState state = clusterService.state();
ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) {
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
} else {
logger.error("unexpected failure during [{}], current state version [{}]", t, source, state.version());
}
}
});
routingTableDirty = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -77,7 +76,11 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;

import com.google.common.base.Charsets;

import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Filter;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
Expand All @@ -72,11 +72,7 @@
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
Expand All @@ -88,7 +84,6 @@
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
Expand All @@ -102,9 +97,9 @@
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -720,6 +715,17 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
createNewEngine();
}

/** called if recovery has to be restarted after network error / delay ** */
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
}
}

/**
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/
Expand Down
Loading