Skip to content

Commit

Permalink
[STORE] Move to on data.path per shard
Browse files Browse the repository at this point in the history
This commit moves away from using stripe RAID-0 simumlation across multiple
data paths towards using a single path per shard. Multiple data paths are still
supported but shards and it's data is not striped across multiple paths / disks.
This will for instance prevent to loose all shards if a single disk is corrupted.

Indices that are using this features already will automatically upgraded to a single
datapath based on a simple diskspace based heuristic. In general there must be enough
diskspace to move a single shard at any time otherwise the upgrade will fail.

Closes elastic#9498
  • Loading branch information
s1monw committed Apr 20, 2015
1 parent 2b04403 commit 91488c5
Show file tree
Hide file tree
Showing 56 changed files with 1,230 additions and 1,814 deletions.
375 changes: 375 additions & 0 deletions src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java

Large diffs are not rendered by default.

54 changes: 26 additions & 28 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;

import org.apache.lucene.store.*;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -86,6 +82,20 @@ public NodePath(Path path) throws IOException {
}
this.spins = spins;
}

/**
* Resolves the given shards directory against this NodePath
*/
public Path resolve(ShardId shardId) {
return resolve(shardId.index()).resolve(Integer.toString(shardId.id()));
}

/**
* Resolves the given indexes directory against this NodePath
*/
public Path resolve(Index index) {
return indicesPath.resolve(index.name());
}
}

private final NodePath[] nodePaths;
Expand Down Expand Up @@ -313,7 +323,7 @@ private static String getMountPoint(FileStore store) {
public void deleteShardDirectorySafe(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
assert indexSettings != ImmutableSettings.EMPTY;
final Path[] paths = shardPaths(shardId);
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (ShardLock lock = shardLock(shardId)) {
deleteShardDirectoryUnderLock(lock, indexSettings);
Expand All @@ -330,7 +340,7 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, @IndexSettings Setting
assert indexSettings != ImmutableSettings.EMPTY;
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";
final Path[] paths = shardPaths(shardId);
final Path[] paths = availableShardPaths(shardId);
IOUtils.rm(paths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, shardId);
Expand Down Expand Up @@ -575,7 +585,7 @@ public Path[] nodeDataPaths() {
}

/**
* Returns an array of all of the {@link #NodePath}s.
* Returns an array of all of the {@link NodePath}s.
*/
public NodePath[] nodePaths() {
assert assertEnvIsLocked();
Expand All @@ -598,36 +608,24 @@ public Path[] indexPaths(Index index) {
}

/**
* Returns all paths where lucene data will be stored, if a index.data_path
* setting is present, will return the custom data path to be used
*/
public Path[] shardDataPaths(ShardId shardId, @IndexSettings Settings indexSettings) {
assert indexSettings != ImmutableSettings.EMPTY;
assert assertEnvIsLocked();
if (hasCustomDataPath(indexSettings)) {
return new Path[] {resolveCustomLocation(indexSettings, shardId)};
} else {
return shardPaths(shardId);
}
}

/**
* Returns all shard paths excluding custom shard path
* Returns all shard paths excluding custom shard path. Note: Shards are only allocated on one of the
* returned paths. The returned array may contain paths to non-existing directories.
*
* @see #hasCustomDataPath(org.elasticsearch.common.settings.Settings)
* @see #resolveCustomLocation(org.elasticsearch.common.settings.Settings, org.elasticsearch.index.shard.ShardId)
*
*/
public Path[] shardPaths(ShardId shardId) {
public Path[] availableShardPaths(ShardId shardId) {
assert assertEnvIsLocked();
final NodePath[] nodePaths = nodePaths();
final Path[] shardLocations = new Path[nodePaths.length];
for (int i = 0; i < nodePaths.length; i++) {
// TODO: wtf with resolve(get())
shardLocations[i] = nodePaths[i].path.resolve(PathUtils.get(INDICES_FOLDER,
shardId.index().name(),
Integer.toString(shardId.id())));
shardLocations[i] = nodePaths[i].resolve(shardId);
}
return shardLocations;
}

public Set<String> findAllIndices() throws Exception {
public Set<String> findAllIndices() throws IOException {
if (nodePaths == null || locks == null) {
throw new ElasticsearchIllegalStateException("node is not configured to store local location");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;

import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
Expand Down Expand Up @@ -106,6 +107,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
RoutingNodes routingNodes = allocation.routingNodes();

// First, handle primaries, they must find a place to be allocated on here
MetaData metaData = routingNodes.metaData();
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
Expand All @@ -118,8 +120,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) {
continue;
}
final String indexUUID = allocation.metaData().index(shard.index()).getUUID();
ObjectLongOpenHashMap<DiscoveryNode> nodesState = buildShardStates(nodes, shard, indexUUID);

ObjectLongOpenHashMap<DiscoveryNode> nodesState = buildShardStates(nodes, shard, metaData.index(shard.index()));

int numberOfAllocationsFound = 0;
long highestVersion = -1;
Expand Down Expand Up @@ -370,7 +372,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
return changed;
}

private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, String indexUUID) {
private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, IndexMetaData indexMetaData) {
ObjectLongOpenHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId());
ObjectOpenHashSet<String> nodeIds;
if (shardStates == null) {
Expand Down Expand Up @@ -399,7 +401,7 @@ public boolean apply(DiscoveryNode node) {
}

String[] nodesIdsArray = nodeIds.toArray(String.class);
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexUUID, nodesIdsArray, listTimeout).actionGet();
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexMetaData.getUUID(), nodesIdsArray, listTimeout).actionGet();
logListActionFailures(shard, "state", response.failures());

for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.env.NodeEnvironment;

import java.io.IOException;
Expand Down Expand Up @@ -71,6 +71,7 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer

if (DiscoveryNode.dataNode(settings)) {
ensureNoPre019ShardState(nodeEnv);
MultiDataPathUpgrader.upgradeMultiDataPath(nodeEnv, logger);
}

if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws Ela
final ShardId shardId = request.getShardId();
final String indexUUID = request.getIndexUUID();
logger.trace("{} loading local shard state info", shardId);
final ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.shardPaths(request.shardId));
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId));
if (shardStateMetaData != null) {
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
Expand Down Expand Up @@ -155,6 +155,7 @@ public Request(ShardId shardId, String indexUUID, String[] nodesIds) {
this.indexUUID = indexUUID;
}


public ShardId shardId() {
return this.shardId;
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@
import org.elasticsearch.index.search.stats.ShardSearchModule;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardCreationException;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotModule;
import org.elasticsearch.index.store.IndexStore;
Expand Down Expand Up @@ -296,6 +293,15 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) throws
boolean success = false;
Injector shardInjector = null;
try {

ShardPath path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (path == null) {
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, indexSettings);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);
}

lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
if (shards.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists");
Expand All @@ -313,8 +319,8 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) throws
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock,
new StoreCloseListener(shardId, canDeleteShardContent)));
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class).shardDirectory(), lock,
new StoreCloseListener(shardId, canDeleteShardContent), path));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.gateway;

import com.google.common.collect.Sets;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -33,7 +32,6 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.mapper.SourceToParse.source;

/**
*
*/
Expand Down Expand Up @@ -195,14 +193,15 @@ public class IndexShard extends AbstractIndexShardComponent {
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
private final ShardPath path;

@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
ClusterService clusterService, NodeEnvironment nodeEnv) {
ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
super(shardId, indexSettingsService.getSettings());
this.codecService = codecService;
this.warmer = warmer;
Expand Down Expand Up @@ -244,8 +243,8 @@ public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, In
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
this.nodeEnv = nodeEnv;
indexSettingsService.addListener(applyRefreshSettings);

this.mapperAnalyzer = new MapperAnalyzer(mapperService);
this.path = path;
/* create engine config */

logger.debug("state: [CREATED]");
Expand Down Expand Up @@ -997,7 +996,10 @@ public void deleteShardState() throws IOException {
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new ElasticsearchIllegalStateException("Can't delete shard state on an active shard");
}
MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
}
public ShardPath shardPath() {
return path;
}

private class ApplyRefreshSettings implements IndexSettingsService.Listener {
Expand Down Expand Up @@ -1200,7 +1202,7 @@ class ShardEngineFailListener implements Engine.FailedEngineListener {
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
try {
// delete the shard state so this folder will not be reused
MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
MetaDataStateFormat.deleteMetaState(nodeEnv.availableShardPaths(shardId));
} catch (IOException e) {
logger.warn("failed to delete shard state", e);
} finally {
Expand Down Expand Up @@ -1258,7 +1260,7 @@ void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) {
}
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID());
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, nodeEnv.shardPaths(shardId));
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, shardPath().getShardStatePath());
} catch (IOException e) { // this is how we used to handle it.... :(
logger.warn("failed to write shard state", e);
// we failed to write the shard state, we will try and write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.aliases.IndexAliasesService;
Expand All @@ -48,7 +46,6 @@
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -82,14 +79,14 @@ public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsServi
IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache,
ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService, NodeEnvironment nodeEnv) {
MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
super(shardId, indexSettingsService, indicesLifecycle, store, mergeScheduler,
translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indexingService, getService, searchService, shardWarmerService, shardFilterCache,
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService, shardSuggestService,
shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService,
mergePolicyProvider, factory, clusterService, nodeEnv);
mergePolicyProvider, factory, clusterService, nodeEnv, path);
}

/**
Expand Down
Loading

0 comments on commit 91488c5

Please sign in to comment.