diff --git a/src/main/java/org/elasticsearch/Version.java b/src/main/java/org/elasticsearch/Version.java
index 806fa921fd55c..6164342b6c4b8 100644
--- a/src/main/java/org/elasticsearch/Version.java
+++ b/src/main/java/org/elasticsearch/Version.java
@@ -30,17 +30,16 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.io.IOException;
-import java.io.Serializable;
/**
*/
@SuppressWarnings("deprecation")
-public class Version implements Serializable {
+public class Version {
// The logic for ID is: XXYYZZAA, where XX is major version, YY is minor version, ZZ is revision, and AA is Beta/RC indicator
// AA values below 50 are beta builds, and below 99 are RC builds, with 99 indicating a release
// the (internal) format of the id is there so we can easily do after/before checks on the id
-
+
// NOTE: indexes created with 3.6 use this constant for e.g. analysis chain emulation (imperfect)
public static final org.apache.lucene.util.Version LUCENE_3_EMULATION_VERSION = org.apache.lucene.util.Version.LUCENE_4_0_0;
@@ -423,6 +422,7 @@ public static Version fromId(int id) {
/**
* Return the {@link Version} of Elasticsearch that has been used to create an index given its settings.
+ *
* @throws ElasticsearchIllegalStateException if the given index settings doesn't contain a value for the key {@value IndexMetaData#SETTING_VERSION_CREATED}
*/
public static Version indexCreated(Settings indexSettings) {
@@ -485,7 +485,7 @@ public static Version fromString(String version) {
}
return versionFromId;
- } catch(NumberFormatException e) {
+ } catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse version " + version, e);
}
}
diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
index 50e47cec63410..3d06d9a03cc69 100644
--- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
+++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
@@ -23,13 +23,11 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
-import org.elasticsearch.cluster.routing.RoutingTable;
-import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
@@ -60,19 +58,19 @@ public class ShardStateAction extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
- private final ThreadPool threadPool;
+ private final RoutingService routingService;
private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue();
private final BlockingQueue failedShardQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
- AllocationService allocationService, ThreadPool threadPool) {
+ AllocationService allocationService, RoutingService routingService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.allocationService = allocationService;
- this.threadPool = threadPool;
+ this.routingService = routingService;
transportService.registerHandler(SHARD_STARTED_ACTION_NAME, new ShardStartedTransportHandler());
transportService.registerHandler(SHARD_FAILED_ACTION_NAME, new ShardFailedTransportHandler());
@@ -104,11 +102,11 @@ private void innerShardFailed(final ShardRouting shardRouting, final String inde
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
- @Override
- public void handleException(TransportException exp) {
- logger.warn("failed to send failed shard to {}", exp, masterNode);
- }
- });
+ @Override
+ public void handleException(TransportException exp) {
+ logger.warn("failed to send failed shard to {}", exp, masterNode);
+ }
+ });
}
}
@@ -132,18 +130,19 @@ public void shardStarted(final ShardRouting shardRouting, String indexUUID, fina
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
- @Override
- public void handleException(TransportException exp) {
- logger.warn("failed to send shard started to [{}]", exp, masterNode);
- }
- });
+ @Override
+ public void handleException(TransportException exp) {
+ logger.warn("failed to send shard started to [{}]", exp, masterNode);
+ }
+ });
}
}
private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
- clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
+
@Override
public ClusterState execute(ClusterState currentState) {
if (shardRoutingEntry.processed) {
@@ -191,6 +190,14 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
+ logger.trace("unassigned shards after shard failures. scheduling a reroute.");
+ routingService.scheduleReroute();
+ }
+ }
});
}
diff --git a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
index 7e22d94f2d258..13b0a62f0e6c5 100644
--- a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
+++ b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
@@ -25,9 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@@ -113,7 +111,8 @@ public static boolean dataNode(Settings settings) {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
*
- * @param nodeId the nodes unique id.
+ *
+ * @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param version the version of the node.
*/
@@ -129,11 +128,12 @@ public DiscoveryNode(String nodeId, TransportAddress address, Version version) {
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
*
- * @param nodeName the nodes name
- * @param nodeId the nodes unique id.
- * @param address the nodes transport address
+ *
+ * @param nodeName the nodes name
+ * @param nodeId the nodes unique id.
+ * @param address the nodes transport address
* @param attributes node attributes
- * @param version the version of the node.
+ * @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map attributes, Version version) {
this(nodeName, nodeId, NetworkUtils.getLocalHostName(""), NetworkUtils.getLocalHostAddress(""), address, attributes, version);
@@ -147,13 +147,14 @@ public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, M
* the node might not be able to communicate with the remove node. After initial handshakes node versions will be discovered
* and updated.
*
- * @param nodeName the nodes name
- * @param nodeId the nodes unique id.
- * @param hostName the nodes hostname
+ *
+ * @param nodeName the nodes name
+ * @param nodeId the nodes unique id.
+ * @param hostName the nodes hostname
* @param hostAddress the nodes host address
- * @param address the nodes transport address
- * @param attributes node attributes
- * @param version the version of the node.
+ * @param address the nodes transport address
+ * @param attributes node attributes
+ * @param version the version of the node.
*/
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address, Map attributes, Version version) {
if (nodeName != null) {
@@ -340,8 +341,9 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public boolean equals(Object obj) {
- if (!(obj instanceof DiscoveryNode))
+ if (!(obj instanceof DiscoveryNode)) {
return false;
+ }
DiscoveryNode other = (DiscoveryNode) obj;
return this.nodeId.equals(other.nodeId);
@@ -372,4 +374,19 @@ public String toString() {
}
return sb.toString();
}
+
+ // we need this custom serialization logic because Version is not serializable (because org.apache.lucene.util.Version is not serializable)
+ private void writeObject(java.io.ObjectOutputStream out)
+ throws IOException {
+ StreamOutput streamOutput = new OutputStreamStreamOutput(out);
+ streamOutput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
+ this.writeTo(streamOutput);
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ StreamInput streamInput = new InputStreamStreamInput(in);
+ streamInput.setVersion(Version.CURRENT.minimumCompatibilityVersion());
+ this.readFrom(streamInput);
+ }
}
diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
index b5dec0d0bf477..c969f6cb119ce 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
@@ -89,6 +89,11 @@ protected void doClose() throws ElasticsearchException {
clusterService.remove(this);
}
+ /** make sure that a reroute will be done by the next scheduled check */
+ public void scheduleReroute() {
+ routingTableDirty = true;
+ }
+
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
@@ -153,8 +158,12 @@ public void onNoLongerMaster(String source) {
@Override
public void onFailure(String source, Throwable t) {
- ClusterState state = clusterService.state();
+ ClusterState state = clusterService.state();
+ if (logger.isTraceEnabled()) {
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
+ } else {
+ logger.error("unexpected failure during [{}], current state version [{}]", t, source, state.version());
+ }
}
});
routingTableDirty = false;
diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
index cb4aca624ea1c..e0884f013ee0a 100644
--- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
+++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
@@ -28,7 +28,6 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
-import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -77,7 +76,11 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
- clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 90e4723f82059..0b8af6cce6c0d 100644
--- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;
import com.google.common.base.Charsets;
-
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Filter;
@@ -51,6 +50,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
@@ -72,11 +72,7 @@
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
-import org.elasticsearch.index.mapper.DocumentMapper;
-import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.index.mapper.ParsedDocument;
-import org.elasticsearch.index.mapper.SourceToParse;
-import org.elasticsearch.index.mapper.Uid;
+import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@@ -88,7 +84,6 @@
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchService;
-import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -102,9 +97,9 @@
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
+import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoveryState;
-import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@@ -720,6 +715,17 @@ public void performRecoveryPrepareForTranslog() throws ElasticsearchException {
createNewEngine();
}
+ /** called if recovery has to be restarted after network error / delay ** */
+ public void performRecoveryRestart() throws IOException {
+ synchronized (mutex) {
+ if (state != IndexShardState.RECOVERING) {
+ throw new IndexShardNotRecoveringException(shardId, state);
+ }
+ final Engine engine = this.currentEngineReference.getAndSet(null);
+ IOUtils.close(engine);
+ }
+ }
+
/**
* The peer recovery state if this shard recovered from a peer shard, null o.w.
*/
diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
index 16ffe8fe98bb2..6df9d94acafd3 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java
@@ -19,13 +19,17 @@
package org.elasticsearch.indices.recovery;
+import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,9 +46,11 @@ public class RecoveriesCollection {
private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
final private ESLogger logger;
+ final private ThreadPool threadPool;
- public RecoveriesCollection(ESLogger logger) {
+ public RecoveriesCollection(ESLogger logger, ThreadPool threadPool) {
this.logger = logger;
+ this.threadPool = threadPool;
}
/**
@@ -52,11 +58,14 @@ public RecoveriesCollection(ESLogger logger) {
*
* @return the id of the new recovery.
*/
- public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
+ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state,
+ RecoveryTarget.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, state, listener);
RecoveryStatus existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
+ threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC,
+ new RecoveryMonitor(status.recoveryId(), status.lastAccessTime(), activityTimeout));
return status.recoveryId();
}
@@ -180,5 +189,45 @@ public RecoveryStatus status() {
return status;
}
}
+
+ private class RecoveryMonitor extends AbstractRunnable {
+ private final long recoveryId;
+ private final TimeValue checkInterval;
+
+ private long lastSeenAccessTime;
+
+ private RecoveryMonitor(long recoveryId, long lastSeenAccessTime, TimeValue checkInterval) {
+ this.recoveryId = recoveryId;
+ this.checkInterval = checkInterval;
+ this.lastSeenAccessTime = lastSeenAccessTime;
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.error("unexpected error while monitoring recovery [{}]", t, recoveryId);
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ RecoveryStatus status = onGoingRecoveries.get(recoveryId);
+ if (status == null) {
+ logger.trace("[monitor] no status found for [{}], shutting down", recoveryId);
+ return;
+ }
+ long accessTime = status.lastAccessTime();
+ if (accessTime == lastSeenAccessTime) {
+ String message = "no activity after [" + checkInterval + "]";
+ failRecovery(recoveryId,
+ new RecoveryFailedException(status.state(), message, new ElasticsearchTimeoutException(message)),
+ true // to be safe, we don't know what go stuck
+ );
+ return;
+ }
+ lastSeenAccessTime = accessTime;
+ logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", lastSeenAccessTime);
+ threadPool.schedule(checkInterval, ThreadPool.Names.GENERIC, this);
+ }
+ }
+
}
diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java
index 729d40f6c78f9..5759078c06964 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java
@@ -47,7 +47,31 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
- public static final String INDICES_RECOVERY_RETRY_DELAY = "indices.recovery.retry_delay";
+
+ /**
+ * how long to wait before retrying after issues cause by cluster state syncing between nodes
+ * i.e., local node is not yet known on remote node, remote shard not yet started etc.
+ */
+ public static final String INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC = "indices.recovery.retry_delay_state_sync";
+
+ /** how long to wait before retrying after network related issues */
+ public static final String INDICES_RECOVERY_RETRY_DELAY_NETWORK = "indices.recovery.retry_delay_network";
+
+ /**
+ * recoveries that don't show any activity for more then this interval will be failed.
+ * defaults to `indices.recovery.internal_action_long_timeout`
+ */
+ public static final String INDICES_RECOVERY_ACTIVITY_TIMEOUT = "indices.recovery.recovery_activity_timeout";
+
+ /** timeout value to use for requests made as part of the recovery process */
+ public static final String INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT = "indices.recovery.internal_action_timeout";
+
+ /**
+ * timeout value to use for requests made as part of the recovery process that are expected to take long time.
+ * defaults to twice `indices.recovery.internal_action_timeout`.
+ */
+ public static final String INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT = "indices.recovery.internal_action_long_timeout";
+
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb").bytes();
@@ -70,17 +94,35 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
private volatile ByteSizeValue maxBytesPerSec;
private volatile SimpleRateLimiter rateLimiter;
- private volatile TimeValue retryDelay;
+ private volatile TimeValue retryDelayStateSync;
+ private volatile TimeValue retryDelayNetwork;
+ private volatile TimeValue activityTimeout;
+ private volatile TimeValue internalActionTimeout;
+ private volatile TimeValue internalActionLongTimeout;
+
@Inject
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
- this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", settings.getAsBytesSize("index.shard.recovery.file_chunk_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
- this.translogOps = componentSettings.getAsInt("translog_ops", settings.getAsInt("index.shard.recovery.translog_ops", 1000));
- this.translogSize = componentSettings.getAsBytesSize("translog_size", settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
- this.compress = componentSettings.getAsBoolean("compress", true);
- this.retryDelay = componentSettings.getAsTime("retry_delay", TimeValue.timeValueMillis(500));
+ this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, settings.getAsBytesSize("index.shard.recovery.file_chunk_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
+ this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, settings.getAsInt("index.shard.recovery.translog_ops", 1000));
+ this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, settings.getAsBytesSize("index.shard.recovery.translog_size", new ByteSizeValue(512, ByteSizeUnit.KB)));
+ this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true);
+
+ this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500));
+ // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
+ // and we want to give the master time to remove a faulty node
+ this.retryDelayNetwork = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_NETWORK, TimeValue.timeValueSeconds(5));
+
+ this.internalActionTimeout = settings.getAsTime(INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, TimeValue.timeValueMinutes(15));
+ this.internalActionLongTimeout = settings.getAsTime(INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, new TimeValue(internalActionTimeout.millis() * 2));
+
+ this.activityTimeout = settings.getAsTime(INDICES_RECOVERY_ACTIVITY_TIMEOUT,
+ // default to the internalActionLongTimeout used as timeouts on RecoverySource
+ internalActionLongTimeout
+ );
+
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
@@ -136,7 +178,26 @@ public RateLimiter rateLimiter() {
return rateLimiter;
}
- public TimeValue retryDelay() { return retryDelay; }
+ public TimeValue retryDelayNetwork() {
+ return retryDelayNetwork;
+ }
+
+ public TimeValue retryDelayStateSync() {
+ return retryDelayStateSync;
+ }
+
+ public TimeValue activityTimeout() {
+ return activityTimeout;
+ }
+
+ public TimeValue internalActionTimeout() {
+ return internalActionTimeout;
+ }
+
+ public TimeValue internalActionLongTimeout() {
+ return internalActionLongTimeout;
+ }
+
class ApplySettings implements NodeSettingsService.Listener {
@Override
@@ -191,11 +252,21 @@ public void onRefreshSettings(Settings settings) {
RecoverySettings.this.concurrentSmallFileStreams = concurrentSmallFileStreams;
RecoverySettings.this.concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams);
}
- final TimeValue retryDelay = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay);
- if (retryDelay.equals(RecoverySettings.this.retryDelay) == false) {
- logger.info("updating [] from [{}] to [{}]",INDICES_RECOVERY_RETRY_DELAY, RecoverySettings.this.retryDelay, retryDelay);
- RecoverySettings.this.retryDelay = retryDelay;
+
+ RecoverySettings.this.retryDelayNetwork = maybeUpdate(RecoverySettings.this.retryDelayNetwork, settings, INDICES_RECOVERY_RETRY_DELAY_NETWORK);
+ RecoverySettings.this.retryDelayStateSync = maybeUpdate(RecoverySettings.this.retryDelayStateSync, settings, INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC);
+ RecoverySettings.this.activityTimeout = maybeUpdate(RecoverySettings.this.activityTimeout, settings, INDICES_RECOVERY_ACTIVITY_TIMEOUT);
+ RecoverySettings.this.internalActionTimeout = maybeUpdate(RecoverySettings.this.internalActionTimeout, settings, INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT);
+ RecoverySettings.this.internalActionLongTimeout = maybeUpdate(RecoverySettings.this.internalActionLongTimeout, settings, INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT);
+ }
+
+ private TimeValue maybeUpdate(final TimeValue currentValue, final Settings settings, final String key) {
+ final TimeValue value = settings.getAsTime(key, currentValue);
+ if (value.equals(currentValue)) {
+ return currentValue;
}
+ logger.info("updating [] from [{}] to [{}]", key, currentValue, value);
+ return value;
}
}
}
diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java
index f42c673245d36..7eb60bc4848c7 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java
@@ -58,8 +58,6 @@ public static class Actions {
private final ClusterService clusterService;
- private final TimeValue internalActionTimeout;
- private final TimeValue internalActionLongTimeout;
private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres();
@@ -83,8 +81,6 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
this.recoverySettings = recoverySettings;
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
- this.internalActionTimeout = componentSettings.getAsTime("internal_action_timeout", TimeValue.timeValueMinutes(15));
- this.internalActionLongTimeout = new TimeValue(internalActionTimeout.millis() * 2);
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
@@ -117,8 +113,7 @@ private RecoveryResponse recover(final StartRecoveryRequest request) {
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
- final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout,
- internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger);
+ final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);
diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java
index aaf2f7cda1af2..39e15532a843c 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java
@@ -70,7 +70,11 @@ public class RecoveryStatus extends AbstractRefCounted {
private final CancellableThreads cancellableThreads = new CancellableThreads();
+ // last time this status was accessed
+ private volatile long lastAccessTime = System.nanoTime();
+
public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
+
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
@@ -113,6 +117,16 @@ public CancellableThreads CancellableThreads() {
return cancellableThreads;
}
+ /** return the last time this RecoveryStatus was used (based on System.nanoTime() */
+ public long lastAccessTime() {
+ return lastAccessTime;
+ }
+
+ /** sets the lasAccessTime flag to now */
+ public void setLastAccessTime() {
+ lastAccessTime = System.nanoTime();
+ }
+
public Store store() {
ensureRefCount();
return store;
@@ -219,33 +233,42 @@ public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData meta
return indexOutput;
}
+ public void resetRecovery() throws IOException {
+ cleanOpenFiles();
+ indexShard().performRecoveryRestart();
+ }
+
@Override
protected void closeInternal() {
try {
- // clean open index outputs
- Iterator> iterator = openIndexOutputs.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry entry = iterator.next();
- logger.trace("closing IndexOutput file [{}]", entry.getValue());
- try {
- entry.getValue().close();
- } catch (Throwable t) {
- logger.debug("error while closing recovery output [{}]", t, entry.getValue());
- }
- iterator.remove();
- }
- // trash temporary files
- for (String file : tempFileNames.keySet()) {
- logger.trace("cleaning temporary file [{}]", file);
- store.deleteQuiet(file);
- }
- legacyChecksums.clear();
+ cleanOpenFiles();
} finally {
// free store. increment happens in constructor
store.decRef();
}
}
+ protected void cleanOpenFiles() {
+ // clean open index outputs
+ Iterator> iterator = openIndexOutputs.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ logger.trace("closing IndexOutput file [{}]", entry.getValue());
+ try {
+ entry.getValue().close();
+ } catch (Throwable t) {
+ logger.debug("error while closing recovery output [{}]", t, entry.getValue());
+ }
+ iterator.remove();
+ }
+ // trash temporary files
+ for (String file : tempFileNames.keySet()) {
+ logger.trace("cleaning temporary file [{}]", file);
+ store.deleteQuiet(file);
+ }
+ legacyChecksums.clear();
+ }
+
@Override
public String toString() {
return shardId + " [" + recoveryId + "]";
diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
index e1e7fba6a0488..aada2fe0ee313 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
@@ -46,6 +46,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -87,7 +88,7 @@ public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
- this.onGoingRecoveries = new RecoveriesCollection(logger);
+ this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool);
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
@@ -136,14 +137,19 @@ public void startRecovery(final IndexShard indexShard, final RecoveryState.Type
recoveryState.setSourceNode(sourceNode);
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setPrimary(indexShard.routingEntry().primary());
- final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener);
+ final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
- protected void retryRecovery(final long recoveryId, TimeValue retryAfter) {
- logger.trace("will retrying recovery with id [{}] in [{}]", recoveryId, retryAfter);
- threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId));
+ protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
+ logger.trace("will retrying recovery with id [{}] in [{}] (reason [{}])", recoveryStatus.recoveryId(), retryAfter, reason);
+ try {
+ recoveryStatus.resetRecovery();
+ } catch (IOException e) {
+ onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(currentRequest, e), true);
+ }
+ threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId()));
}
private void doRecovery(final RecoveryStatus recoveryStatus) {
@@ -204,6 +210,7 @@ public RecoveryResponse newInstance() {
} catch (CancellableThreads.ExecutionCancelledException e) {
logger.trace("recovery cancelled", e);
} catch (Throwable e) {
+
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
}
@@ -223,17 +230,18 @@ public RecoveryResponse newInstance() {
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
// if the target is not ready yet, retry
- retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
+ retryRecovery(recoveryStatus, "remote shard not ready", recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof DelayRecoveryException) {
- retryRecovery(recoveryStatus.recoveryId(), recoverySettings.retryDelay());
+ retryRecovery(recoveryStatus, cause.getMessage(), recoverySettings.retryDelayStateSync(), request);
return;
}
if (cause instanceof ConnectTransportException) {
- onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source node disconnected", cause), false);
+ logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", recoveryStatus.shardId(), recoverySettings.retryDelayNetwork(), cause.getMessage());
+ retryRecovery(recoveryStatus, cause.getMessage(), recoverySettings.retryDelayNetwork(), request);
return;
}
diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java
index a45b3a01ee4f4..00e1ab68fef1f 100644
--- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java
+++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java
@@ -81,8 +81,6 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
- private final TimeValue internalActionTimeout;
- private final TimeValue internalActionLongTimeout;
private final ClusterService clusterService;
private final IndexService indexService;
private final MappingUpdatedAction mappingUpdatedAction;
@@ -106,16 +104,13 @@ protected void onCancel(String reason, @Nullable Throwable suppressedException)
public ShardRecoveryHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
- final TransportService transportService, final TimeValue internalActionTimeout,
- final TimeValue internalActionLongTimeout, final ClusterService clusterService,
+ final TransportService transportService, final ClusterService clusterService,
final IndicesService indicesService, final MappingUpdatedAction mappingUpdatedAction, final ESLogger logger) {
this.shard = shard;
this.request = request;
this.recoverySettings = recoverySettings;
this.logger = logger;
this.transportService = transportService;
- this.internalActionTimeout = internalActionTimeout;
- this.internalActionLongTimeout = internalActionLongTimeout;
this.clusterService = clusterService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();
@@ -203,7 +198,7 @@ public void run() throws InterruptedException {
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
- TransportRequestOptions.options().withTimeout(internalActionTimeout),
+ TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@@ -266,7 +261,7 @@ protected void doRun() {
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
.withCompress(shouldCompressRequest)
.withType(TransportRequestOptions.Type.RECOVERY)
- .withTimeout(internalActionTimeout);
+ .withTimeout(recoverySettings.internalActionTimeout());
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
@@ -351,7 +346,7 @@ public void run() throws InterruptedException {
// are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
- TransportRequestOptions.options().withTimeout(internalActionTimeout),
+ TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@@ -393,7 +388,7 @@ public void run() throws InterruptedException {
// garbage collection (not the JVM's GC!) of tombstone deletes
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()),
- TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
+ TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@@ -450,7 +445,7 @@ public void run() throws InterruptedException {
// during this time
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
- TransportRequestOptions.options().withTimeout(internalActionLongTimeout),
+ TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
@@ -546,9 +541,9 @@ public void onFailure(Throwable t) {
@Override
public void run() throws InterruptedException {
try {
- if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
+ if (!updatedOnMaster.await(recoverySettings.internalActionTimeout().millis(), TimeUnit.MILLISECONDS)) {
logger.debug("[{}][{}] recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]",
- indexName, shardId, request.targetNode(), internalActionTimeout);
+ indexName, shardId, request.targetNode(), recoverySettings.internalActionTimeout());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -576,7 +571,7 @@ private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchExcepti
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
.withCompress(recoverySettings.compress())
.withType(TransportRequestOptions.Type.RECOVERY)
- .withTimeout(internalActionLongTimeout);
+ .withTimeout(recoverySettings.internalActionLongTimeout());
while (operation != null) {
if (shard.state() == IndexShardState.CLOSED) {
diff --git a/src/test/java/org/elasticsearch/VersionTests.java b/src/test/java/org/elasticsearch/VersionTests.java
index 24460f5d43ea3..c54a7ee7a09a5 100644
--- a/src/test/java/org/elasticsearch/VersionTests.java
+++ b/src/test/java/org/elasticsearch/VersionTests.java
@@ -76,6 +76,7 @@ public void testVersionConstantPresent() {
assertThat(version.luceneVersion, sameInstance(Version.fromId(version.id).luceneVersion));
}
}
+
@Test
public void testCURRENTIsLatest() {
final int iters = scaledRandomIntBetween(100, 1000);
diff --git a/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java b/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java
new file mode 100644
index 0000000000000..ace51a3587e4e
--- /dev/null
+++ b/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.node;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.ThrowableObjectInputStream;
+import org.elasticsearch.common.io.ThrowableObjectOutputStream;
+import org.elasticsearch.common.io.stream.BytesStreamInput;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DiscoveryNodeTests extends ElasticsearchTestCase {
+
+
+ @Test
+ public void testJavaSerializablilty() throws IOException, ClassNotFoundException {
+ final int iters = scaledRandomIntBetween(100, 300);
+ for (int i = 0; i < iters; i++) {
+ final String id = randomUnicodeOfLengthBetween(3, 20);
+ final String nodeName = randomUnicodeOfLengthBetween(3, 20);
+ final String hostName = randomUnicodeOfLengthBetween(3, 20);
+ final String hostAddress = randomUnicodeOfLengthBetween(3, 20);
+ final TransportAddress transportAddress = new LocalTransportAddress(randomUnicodeOfLengthBetween(3, 20));
+ final Map attributes = new HashMap<>();
+ for (int a = randomInt(10); a > 0; a--) {
+ attributes.put(randomUnicodeOfLengthBetween(3, 20), randomUnicodeOfLengthBetween(3, 20));
+ }
+ final Version version = randomVersion();
+ DiscoveryNode discoveryNode = new DiscoveryNode(nodeName, id, hostName, hostAddress, transportAddress, attributes, version);
+ BytesStreamOutput bytesOutput = new BytesStreamOutput();
+ ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(bytesOutput);
+ too.writeObject(discoveryNode);
+ too.close();
+ ThrowableObjectInputStream from = new ThrowableObjectInputStream(new BytesStreamInput(bytesOutput.bytes()));
+ DiscoveryNode readDiscoveryNode = (DiscoveryNode) from.readObject();
+ from.close();
+ assertThat(readDiscoveryNode, Matchers.equalTo(discoveryNode));
+ assertThat(readDiscoveryNode.id(), Matchers.equalTo(id));
+ assertThat(readDiscoveryNode.name(), Matchers.equalTo(nodeName));
+ assertThat(readDiscoveryNode.getHostName(), Matchers.equalTo(hostName));
+ assertThat(readDiscoveryNode.getHostAddress(), Matchers.equalTo(hostAddress));
+ assertThat(readDiscoveryNode.address(), Matchers.equalTo(transportAddress));
+ assertThat(readDiscoveryNode.attributes(), Matchers.equalTo(attributes));
+ assertThat(readDiscoveryNode.version(), Matchers.equalTo(version));
+ }
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java
index 8485a5aae7df2..a7a14535acfe1 100644
--- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java
+++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java
@@ -22,8 +22,8 @@
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
diff --git a/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java b/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java
index 3084a6a58d6b1..a776ef616e71a 100644
--- a/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java
+++ b/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java
@@ -20,15 +20,25 @@
package org.elasticsearch.indices.recovery;
import com.carrotsearch.randomizedtesting.LifecycleScope;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.indices.recovery.RecoveryState.Type;
@@ -36,16 +46,22 @@
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.test.store.MockDirectoryHelper;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.*;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
/**
@@ -412,4 +428,116 @@ private void validateIndexRecoveryState(RecoveryState.Index indexState) {
assertThat(indexState.percentBytesRecovered(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.percentBytesRecovered(), lessThanOrEqualTo(100.0f));
}
+
+ @Test
+ public void disconnectsWhileRecoveringTest() throws Exception {
+ final String indexName = "test";
+ final Settings nodeSettings = ImmutableSettings.builder()
+ .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, "100ms")
+ .put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, "1s")
+ .put("cluster.routing.schedule", "100ms") // aggressive reroute post shard failures
+ .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
+ .put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) // restarted recoveries will delete temp files and write them again
+ .build();
+ // start a master node
+ internalCluster().startNode(nodeSettings);
+
+ ListenableFuture blueFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "blue").put(nodeSettings).build());
+ ListenableFuture redFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "red").put(nodeSettings).build());
+ final String blueNodeName = blueFuture.get();
+ final String redNodeName = redFuture.get();
+
+ ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
+ assertThat(response.isTimedOut(), is(false));
+
+
+ client().admin().indices().prepareCreate(indexName)
+ .setSettings(
+ ImmutableSettings.builder()
+ .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+ ).get();
+
+ List requests = new ArrayList<>();
+ int numDocs = scaledRandomIntBetween(25, 250);
+ for (int i = 0; i < numDocs; i++) {
+ requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
+ }
+ indexRandom(true, requests);
+ ensureSearchable(indexName);
+
+ ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
+ final String blueNodeId = internalCluster().getInstance(DiscoveryService.class, blueNodeName).localNode().id();
+
+ assertFalse(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
+
+ SearchResponse searchResponse = client().prepareSearch(indexName).get();
+ assertHitCount(searchResponse, numDocs);
+
+ String[] recoveryActions = new String[]{
+ RecoverySource.Actions.START_RECOVERY,
+ RecoveryTarget.Actions.FILES_INFO,
+ RecoveryTarget.Actions.FILE_CHUNK,
+ RecoveryTarget.Actions.CLEAN_FILES,
+ //RecoveryTarget.Actions.TRANSLOG_OPS, <-- may not be sent if already flushed
+ RecoveryTarget.Actions.PREPARE_TRANSLOG,
+ RecoveryTarget.Actions.FINALIZE
+ };
+ final String recoveryActionToBlock = randomFrom(recoveryActions);
+ final boolean dropRequests = randomBoolean();
+ logger.info("--> will {} between blue & red on [{}]", dropRequests ? "drop requests" : "break connection", recoveryActionToBlock);
+
+ MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
+ MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
+ DiscoveryNode redDiscoNode = internalCluster().getInstance(ClusterService.class, redNodeName).localNode();
+ DiscoveryNode blueDiscoNode = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode();
+ final CountDownLatch requestBlocked = new CountDownLatch(1);
+
+ blueMockTransportService.addDelegate(redDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
+ redMockTransportService.addDelegate(blueDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
+
+ logger.info("--> starting recovery from blue to red");
+ client().admin().indices().prepareUpdateSettings(indexName).setSettings(
+ ImmutableSettings.builder()
+ .put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red,blue")
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+ ).get();
+
+ requestBlocked.await();
+
+ logger.info("--> stopping to block recovery");
+ blueMockTransportService.clearAllRules();
+ redMockTransportService.clearAllRules();
+
+ ensureGreen();
+ searchResponse = client(redNodeName).prepareSearch(indexName).setPreference("_local").get();
+ assertHitCount(searchResponse, numDocs);
+
+ }
+
+ private class RecoveryActionBlocker extends MockTransportService.DelegateTransport {
+ private final boolean dropRequests;
+ private final String recoveryActionToBlock;
+ private final CountDownLatch requestBlocked;
+
+ public RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, Transport delegate, CountDownLatch requestBlocked) {
+ super(delegate);
+ this.dropRequests = dropRequests;
+ this.recoveryActionToBlock = recoveryActionToBlock;
+ this.requestBlocked = requestBlocked;
+ }
+
+ public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+ if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
+ logger.info("--> preventing {} request", action);
+ requestBlocked.countDown();
+ if (dropRequests) {
+ return;
+ }
+ throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request");
+ }
+ transport.sendRequest(node, requestId, action, request, options);
+ }
+ }
}
diff --git a/src/test/java/org/elasticsearch/recovery/RecoverySettingsTest.java b/src/test/java/org/elasticsearch/recovery/RecoverySettingsTest.java
new file mode 100644
index 0000000000000..1afeea039d6e4
--- /dev/null
+++ b/src/test/java/org/elasticsearch/recovery/RecoverySettingsTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.recovery;
+
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.test.ElasticsearchSingleNodeTest;
+import org.junit.Test;
+
+public class RecoverySettingsTest extends ElasticsearchSingleNodeTest {
+
+ @Override
+ protected boolean resetNodeAfterTest() {
+ return true;
+ }
+
+ @Test
+ public void testAllSettingsAreDynamicallyUpdatable() {
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.translogOps());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.concurrentStreamPool().getMaximumPoolSize());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.concurrentSmallFileStreamPool().getMaximumPoolSize());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, 0, new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(null, recoverySettings.rateLimiter());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.retryDelayStateSync().millis());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.retryDelayNetwork().millis());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.activityTimeout().millis());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.internalActionTimeout().millis());
+ }
+ });
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, randomIntBetween(1, 200), new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis());
+ }
+ });
+
+ innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() {
+ @Override
+ public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
+ assertEquals(expectedValue, recoverySettings.compress());
+ }
+ });
+ }
+
+ private static class Validator {
+ public void validate(RecoverySettings recoverySettings, int expectedValue) {
+ }
+
+ public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
+ }
+ }
+
+ private void innerTestSettings(String key, int newValue, Validator validator) {
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder().put(key, newValue)).get();
+ validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
+ }
+
+ private void innerTestSettings(String key, boolean newValue, Validator validator) {
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder().put(key, newValue)).get();
+ validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
index 4168877a508e1..63012cfb4e754 100644
--- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java
+++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
@@ -532,9 +532,6 @@ public RecoveryCorruption(Transport transport, CountDownLatch corruptionCount) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
-// if (action.equals(RecoveryTarget.Actions.PREPARE_TRANSLOG)) {
-// logger.debug("dropped [{}] to {}", action, node);
- //} else
if (action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
@@ -550,5 +547,5 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans
}
}
}
-
}
+
diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
index b037d16ea63ae..2dff9dc4a2edf 100644
--- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
@@ -47,10 +47,10 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
-import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.FileSystemUtils;
@@ -293,14 +293,13 @@ public InternalTestCluster(long clusterSeed,
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 10, 15));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 5, 10));
- builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 10, 25))); // more shared - we need to retry more often
} else if (random.nextInt(100) <= 90) {
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
builder.put(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RandomInts.randomIntBetween(random, 3, 6));
builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, RandomInts.randomIntBetween(random, 2, 5));
}
// always reduce this - it can make tests really slow
- builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
+ builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
defaultSettings = builder.build();
executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
}