+package org.elasticsearch.common.util;
+import com.google.common.base.Charsets;
+import org.apache.lucene.index.CheckIndex;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.gateway.MetaDataStateFormat;
+import org.elasticsearch.index.settings.IndexSettings;
+import org.elasticsearch.index.shard.*;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.*;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+ */
+public class MultiDataPathUpgrader {
+ private final NodeEnvironment nodeEnvironment;
+ private final ESLogger logger = Loggers.getLogger(getClass());
+ public MultiDataPathUpgrader(NodeEnvironment nodeEnvironment) {
+ this.nodeEnvironment = nodeEnvironment;
+ }
+ /**
+ * Upgrades the given shard Id from multiple shard paths into the given target path.
+ *
+ * @see #pickShardPath(org.elasticsearch.index.shard.ShardId, org.elasticsearch.common.settings.Settings)
+ */
+ public void upgrade(ShardId shard, ShardPath targetPath) throws IOException {
+ final Path[] paths = nodeEnvironment.shardPaths(shard); // custom data path doesn't need upgrading
+ if (isTargetPathConfigured(paths, targetPath) == false) {
+ throw new IllegalArgumentException("shard path must be one of the shards data paths");
+ }
+ if (paths.length == 1) {
+ logger.info("{} only one data path configured - skipping upgrade", shard);
+ return;
+ }
+ logger.info("{} upgrading multi data dir to {}", shard, targetPath.getDataPath());
+ final ShardStateMetaData loaded = ShardStateMetaData.FORMAT.loadLatestState(logger, paths);
+ if (loaded == null) {
+ throw new IllegalStateException("Can't upgrade shard without shard state");
+ }
+ logger.info("{} load shard state {}", shard, loaded);
+ ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath());
+ Files.createDirectories(targetPath.resolveIndex());
+ try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) {
+ try (final Lock lock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
+ if (lock.obtain(5000)) {
+ upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
+ } else {
+ throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex());
+ }
+ }
+ }
+ upgradeFiles(shard, targetPath, targetPath.resolveTranslog(), ShardPath.TRANSLOG_FOLDER_NAME, paths);
+ logger.info("{} wipe old state files", shard);
+ for (Path path : paths) {
+ if (path.equals(targetPath.getShardStatePath()) == false) {
+ logger.info("{} wipe state file in {}", shard, path);
+ MetaDataStateFormat.deleteMetaState(path);
+ }
+ }
+ if (FileSystemUtils.files(targetPath.resolveIndex()).length == 0) {
+ throw new IllegalStateException("index folder [" + targetPath.resolveIndex() + "] is empty");
+ }
+ if (FileSystemUtils.files(targetPath.resolveTranslog()).length == 0) {
+ throw new IllegalStateException("translog folder [" + targetPath.resolveTranslog() + "] is empty");
+ }
+ }
+ /**
+ * Runs check-index on the target shard and throws an exception if it failed
+ */
+ public void checkIndex(ShardPath targetPath) throws IOException {
+ BytesStreamOutput os = new BytesStreamOutput();
+ PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
+ try (Directory directory = new SimpleFSDirectory(targetPath.resolveIndex());
+ CheckIndex checkIndex = new CheckIndex(directory)) {
+ checkIndex.setInfoStream(out);
+ CheckIndex.Status status = checkIndex.checkIndex();
+ out.flush();
+ if (!status.clean) {
+ logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
+ throw new ElasticsearchIllegalStateException("index check failure");
+ }
+ }
+ }
+ /**
+ * Returns true iff the given shard needs upgrading.
+ */
+ public boolean needsUpgrading(ShardId shard, @IndexSettings Settings settings) {
+ final Path[] paths = nodeEnvironment.shardPaths(shard);
+ // custom data path doesn't need upgrading neither single path envs
+ if (NodeEnvironment.hasCustomDataPath(settings) == false && paths.length > 1) {
+ int numPathsExist = 0;
+ for (Path path : paths) {
+ if (Files.exists(path.resolve(MetaDataStateFormat.STATE_DIR_NAME))) {
+ numPathsExist++;
+ }
+ if (numPathsExist > 1) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ /**
+ * Picks a target ShardPath to allocate and upgrade the given shard to. It picks the target based on a simple
+ * heuristic:
+ *
+ * - if the smallest datapath has 2x more space available that the shards total size the datapath with the most bytes for that shard is picked to minimize the amount of bytes to copy
+ * - otherwise the largest available datapath is used as the target no matter how big of a slice of the shard it already holds.
+ *
+ */
+ public ShardPath pickShardPath(ShardId shard, @IndexSettings Settings settings) throws IOException {
+ if (needsUpgrading(shard, settings) == false) {
+ throw new IllegalStateException("Shard doesn't need upgrading");
+ }
+ final Path[] paths = nodeEnvironment.shardPaths(shard);
+ // if we need upgradeing make sure we have all paths.
+ for (Path path : paths) {
+ Files.createDirectories(path);
+ }
+ final ShardFileInfo[] shardFileInfo = getShardFileInfo(shard);
+ long totalBytesUsedByShard = 0;
+ long leastUsableSpace = Long.MAX_VALUE;
+ long mostUsableSpace = Long.MIN_VALUE;
+ assert shardFileInfo.length == nodeEnvironment.shardPaths(shard).length;
+ for (ShardFileInfo info : shardFileInfo) {
+ totalBytesUsedByShard += info.spaceUsedByShard;
+ leastUsableSpace = Math.min(leastUsableSpace, info.usableSpace + info.spaceUsedByShard);
+ mostUsableSpace = Math.max(mostUsableSpace, info.usableSpace + info.spaceUsedByShard);
+ }
+ if (mostUsableSpace < totalBytesUsedByShard) {
+ throw new IllegalStateException("Can't upgrade path available space: " + mostUsableSpace + " bytes required space: " + totalBytesUsedByShard + " bytes");
+ }
+ ShardFileInfo target = shardFileInfo[0];
+ if (leastUsableSpace >= (2 * totalBytesUsedByShard)) {
+ for (ShardFileInfo info : shardFileInfo) {
+ if (info.spaceUsedByShard > target.spaceUsedByShard) {
+ target = info;
+ }
+ }
+ } else {
+ for (ShardFileInfo info : shardFileInfo) {
+ if (info.usableSpace > target.usableSpace) {
+ target = info;
+ }
+ }
+ }
+ return new ShardPath(target.path, target.path, settings.get(IndexMetaData.SETTING_UUID), shard);
+ }
+ private ShardFileInfo[] getShardFileInfo(ShardId shard) throws IOException {
+ final Path[] paths = nodeEnvironment.shardPaths(shard); // custom data path doesn't need upgrading
+ final ShardFileInfo[] info = new ShardFileInfo[paths.length];
+ for (int i = 0; i < info.length; i++) {
+ Path path = paths[i];
+ final long usableSpace = getUsabelSpace(path);
+ info[i] = new ShardFileInfo(path, usableSpace, getSpaceUsedByShard(path));
+ }
+ return info;
+ }
+ protected long getSpaceUsedByShard(Path path) throws IOException {
+ final long[] spaceUsedByShard = new long[] {0};
+ if (Files.exists(path)) {
+ Files.walkFileTree(path, new FileVisitor() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ if (attrs.isRegularFile()) {
+ spaceUsedByShard[0] += attrs.size();
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+ return spaceUsedByShard[0];
+ }
+ protected long getUsabelSpace(Path path) throws IOException {
+ FileStore fileStore = Files.getFileStore(path);
+ return fileStore.getUsableSpace();
+ }
+ static class ShardFileInfo {
+ final Path path;
+ final long usableSpace;
+ final long spaceUsedByShard;
+ ShardFileInfo(Path path, long usableSpace, long spaceUsedByShard) {
+ this.path = path;
+ this.usableSpace = usableSpace;
+ this.spaceUsedByShard = spaceUsedByShard;
+ }
+ }
+ private void upgradeFiles(ShardId shard, ShardPath targetPath, final Path targetDir, String folderName, Path[] paths) throws IOException {
+ List movedFiles = new ArrayList<>();
+ for (Path path : paths) {
+ if (path.equals(targetPath.getDataPath()) == false) {
+ final Path sourceDir = path.resolve(folderName);
+ if (Files.exists(sourceDir)) {
+ logger.info("{} upgrading {} path {} ", shard, folderName, sourceDir);
+ try (DirectoryStream stream = Files.newDirectoryStream(sourceDir)) {
+ Files.createDirectories(targetDir);
+ for (Path file : stream) {
+ if (IndexWriter.WRITE_LOCK_NAME.equals(file.getFileName().toString())) {
+ continue; // skip write.lock
+ }
+ logger.info("{} move file [{}] size: [{}]", shard, file.getFileName(), Files.size(file));
+ final Path targetFile = targetDir.resolve(file.getFileName());
+ Files.move(file, targetFile);
+ movedFiles.add(targetFile);
+ }
+ }
+ }
+ }
+ }
+ if (movedFiles.isEmpty() == false) {
+ // fsync later it might be on disk already
+ logger.info("{} fsync files", shard);
+ for (Path moved : movedFiles) {
+ logger.info("{} syncing {}", shard, moved.getFileName());
+ IOUtils.fsync(moved, false);
+ }
+ logger.info("{} syncing directory {}", shard, targetDir);
+ IOUtils.fsync(targetDir, true);
+ }
+ }
+ /**
+ * Returns true
iff the target path is one of the given paths.
+ */
+ private boolean isTargetPathConfigured(final Path[] paths, ShardPath targetPath) {
+ for (Path path : paths) {
+ if (path.equals(targetPath.getDataPath())) {
+ return true;
+ }
+ }
+ return false;
+ }
return indexPaths;
* Returns all shard paths excluding custom shard path
+ *
+ * @see #hasCustomDataPath(org.elasticsearch.common.settings.Settings)
+ * @see #resolveCustomLocation(org.elasticsearch.common.settings.Settings, org.elasticsearch.index.shard.ShardId)
+ *
public Path[] shardPaths(ShardId shardId) {
assert assertEnvIsLocked();
@@ -640,7 +630,6 @@ public static boolean hasCustomDataPath(@IndexSettings Settings indexSettings) {
* @param indexSettings settings for the index
private Path resolveCustomLocation(@IndexSettings Settings indexSettings) {
- assert indexSettings != ImmutableSettings.EMPTY;
String customDataDir = indexSettings.get(IndexMetaData.SETTING_DATA_PATH);
if (customDataDir != null) {
// This assert is because this should be caught by MetaDataCreateIndexService
+package org.elasticsearch.env;
+import org.elasticsearch.index.shard.ShardId;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+ */
+public class PathSelector {
+ public Path selectShardDataPath(ShardId shardId, final Path[] dataPaths) throws IOException {
+ for (int i = 0; i < dataPaths.length; i++) {
+ if (doesShardExists(shardId, dataPaths[i])) {
+ return dataPaths[i];
+ }
+ }
+ Path maxUseablePath = dataPaths[0];
+ long maxUseableSpace = Files.getFileStore(maxUseablePath).getUsableSpace();
+ if (dataPaths.length > 1) {
+ for (Path path : dataPaths) {
+ long usableSpace = Files.getFileStore(path).getUsableSpace();
+ if (maxUseableSpace < usableSpace) {
+ maxUseableSpace = usableSpace;
+ maxUseablePath = path;
+ }
+ }
+ }
+ return maxUseablePath;
+ }
+ public boolean doesShardExists(ShardId shardId, Path path) {
+ Path location = path.resolve(Paths.get(NodeEnvironment.INDICES_FOLDER,
+ shardId.index().name(),
+ Integer.toString(shardId.id())));
+ return Files.exists(location, LinkOption.NOFOLLOW_LINKS);
+ }
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
@@ -106,6 +107,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
RoutingNodes routingNodes = allocation.routingNodes();
// First, handle primaries, they must find a place to be allocated on here
+ MetaData metaData = routingNodes.metaData();
Iterator unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
@@ -118,8 +120,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) {
- final String indexUUID = allocation.metaData().index(shard.index()).getUUID();
- ObjectLongOpenHashMap nodesState = buildShardStates(nodes, shard, indexUUID);
+ ObjectLongOpenHashMap nodesState = buildShardStates(nodes, shard, metaData.index(shard.index()));
int numberOfAllocationsFound = 0;
long highestVersion = -1;
@@ -370,7 +372,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
return changed;
- private ObjectLongOpenHashMap buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, String indexUUID) {
+ private ObjectLongOpenHashMap buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, IndexMetaData indexMetaData) {
ObjectLongOpenHashMap shardStates = cachedShardsState.get(shard.shardId());
ObjectOpenHashSet nodeIds;
if (shardStates == null) {
@@ -399,7 +401,7 @@ public boolean apply(DiscoveryNode node) {
String[] nodesIdsArray = nodeIds.toArray(String.class);
- TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexUUID, nodesIdsArray, listTimeout).actionGet();
+ TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexMetaData.getUUID(), nodesIdsArray, listTimeout).actionGet();
logListActionFailures(shard, "state", response.failures());
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.MultiDataPathUpgrader;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.env.ShardLock;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Set;
@@ -73,6 +79,22 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
+ if (DiscoveryNode.dataNode(settings)) {
+ MultiDataPathUpgrader upgrader = new MultiDataPathUpgrader(nodeEnv);
+ final Set allShardIds = nodeEnv.findAllShardIds();
+ for (ShardId shardId : allShardIds) {
+ try (ShardLock lock = nodeEnv.shardLock(shardId, 0)) {
+ if (upgrader.needsUpgrading(shardId, ImmutableSettings.EMPTY)) { // nocommit - where are we getting the custom paths from?
+ final ShardPath shardPath = upgrader.pickShardPath(shardId, ImmutableSettings.EMPTY);
+ upgrader.upgrade(shardId, shardPath);
+ if (Files.exists(shardPath.resolveIndex())) {
+ upgrader.checkIndex(shardPath);
+ }
+ }
+ }
+ }
+ }
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.threadpool.ThreadPool;
@@ -45,6 +47,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@@ -117,7 +121,7 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws Ela
final ShardId shardId = request.getShardId();
final String indexUUID = request.getIndexUUID();
logger.trace("{} loading local shard state info", shardId);
- final ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.shardPaths(request.shardId));
+ ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.shardPaths(request.shardId));
if (shardStateMetaData != null) {
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
@@ -155,6 +159,7 @@ public Request(ShardId shardId, String indexUUID, String[] nodesIds) {
this.indexUUID = indexUUID;
public ShardId shardId() {
return this.shardId;
import org.elasticsearch.index.search.stats.ShardSearchModule;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.IndexShardCreationException;
-import org.elasticsearch.index.shard.IndexShardModule;
-import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotModule;
import org.elasticsearch.index.store.IndexStore;
@@ -296,6 +293,16 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) throws
boolean success = false;
Injector shardInjector = null;
try {
+ ShardPath path = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
+ if (path == null) {
+ path = ShardPath.findShardPath(nodeEnv, shardId, indexSettings);
+ logger.debug("found path {}", path);
+ } else {
+ logger.debug("using loaded path {}", path);
+ }
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
if (shards.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists");
@@ -313,8 +320,8 @@ public synchronized IndexShard createShard(int sShardId, boolean primary) throws
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
- modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock,
- new StoreCloseListener(shardId, canDeleteShardContent)));
+ modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class).shardDirectory(), lock,
+ new StoreCloseListener(shardId, canDeleteShardContent), path));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@@ -166,29 +166,23 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
logger.debug("failed to list file details", e);
+ final Translog translog = indexShard.translog();
+ final Path translogName = translog.getPath(translogId);
+ logger.trace("try recover from translog file {} locations: {}", translogName, translog.location());
Path recoveringTranslogFile = null;
- if (translogId == -1) {
- logger.trace("no translog id set (indexShouldExist [{}])", indexShouldExists);
- } else {
- final Translog translog = indexShard.translog();
- final Path translogName = translog.getPath(translogId);
- logger.trace("try recover from translog file {} locations: {}", translogName, Arrays.toString(translog.locations()));
- for (Path translogLocation : translog.locations()) {
- // we have to support .recovering since it's a leftover from previous version but might still be on the filesystem
- // we used to rename the foo into foo.recovering since foo was reused / overwritten but we fixed that in 1.5
- for (Path recoveryFiles : FileSystemUtils.files(translogLocation, translogName.getFileName() + "{.recovering,}")) {
- logger.trace("Translog file found in {}", recoveryFiles);
- recoveringTranslogFile = recoveryFiles;
- break OUTER;
- }
- logger.trace("Translog file NOT found in {} - continue", translogLocation);
- }
+ // we have to support .recovering since it's a leftover from previous version but might still be on the filesystem
+ // we used to rename the foo into foo.recovering since foo was reused / overwritten but we fixed that in 1.5
+ for (Path recoveryFiles : FileSystemUtils.files( translog.location(), translogName.getFileName() + "{.recovering,}")) {
+ logger.trace("Translog file found in {}", recoveryFiles);
+ recoveringTranslogFile = recoveryFiles;
+ break;
+ logger.trace("Translog file NOT found in {} - continue", translog.location());
// we must do this *after* we capture translog name so the engine creation will not make a new one.
// also we have to do this regardless of whether we have a translog, to follow the recovery stages.
if (recoveringTranslogFile == null || Files.exists(recoveringTranslogFile) == false) {
// no translog files, bail
diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 067978a24bcf3..db2f69cdf4441 100644
--- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -190,6 +190,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* This setting is realtime updateable.
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
+ private final ShardPath path;
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
@@ -197,7 +198,7 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
- ClusterService clusterService, NodeEnvironment nodeEnv) {
+ ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
@@ -234,8 +235,8 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
this.nodeEnv = nodeEnv;
this.mapperAnalyzer = new MapperAnalyzer(mapperService);
+ this.path = path;
/* create engine config */
this.config = new EngineConfig(shardId,
@@ -1011,10 +1012,13 @@ public final boolean isFlushOnClose() {
* @throws IOException if the delete fails
public void deleteShardState() throws IOException {
- if (this.routingEntry() != null && this.routingEntry().active()) {
+ if (this.routingEntry() != null && this.routingEntry().active()) {
throw new ElasticsearchIllegalStateException("Can't delete shard state on a active shard");
- MetaDataStateFormat.deleteMetaState(nodeEnv.shardPaths(shardId));
+ MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
+ }
+ public ShardPath shardPath() {
+ return path;
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@@ -1275,7 +1279,7 @@ void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) {
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.version(), newRouting.primary(), getIndexUUID());
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
- ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, nodeEnv.shardPaths(shardId));
+ ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.version, shardPath().getShardStatePath());
} catch (IOException e) { // this is how we used to handle it.... :(
logger.warn("failed to write shard state", e);
// we failed to write the shard state, we will try and write
IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache,
ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
- MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService, NodeEnvironment nodeEnv) {
+ MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler,
translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indexingService, getService, searchService, shardWarmerService, shardFilterCache,
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService, shardSuggestService,
shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService,
- mergePolicyProvider, factory, clusterService, nodeEnv);
+ mergePolicyProvider, factory, clusterService, nodeEnv, path);
diff --git a/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/src/main/java/org/elasticsearch/index/shard/ShardPath.java
+package org.elasticsearch.index.shard;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.settings.IndexSettings;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+public final class ShardPath {
+ public static final String INDEX_FOLDER_NAME = "index";
+ public static final String TRANSLOG_FOLDER_NAME = "translog";
+ private final Path path;
+ private final String indexUUID;
+ private final ShardId shardId;
+ private final Path shardStatePaths;
+ public ShardPath(Path path, Path shardStatePaths, String indexUUID, ShardId shardId) {
+ this.path = path;
+ this.indexUUID = indexUUID;
+ this.shardId = shardId;
+ this.shardStatePaths = shardStatePaths;
+ }
+ public Path resolveTranslog() {
+ return path.resolve(TRANSLOG_FOLDER_NAME);
+ }
+ public Path resolveIndex() {
+ return path.resolve(INDEX_FOLDER_NAME);
+ }
+ public Path getDataPath() {
+ return path;
+ }
+ public boolean exists() {
+ return Files.exists(path);
+ }
+ public String getIndexUUID() {
+ return indexUUID;
+ }
+ public ShardId getShardId() {
+ return shardId;
+ }
+ public Path getShardStatePath() {
+ return shardStatePaths;
+ }
+ /**
+ * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple
+ * directories with a valid shard state exist the one with the highest version will be used.
+ * Note: this method resolves custom data locations for the shard.
+ */
+ public static ShardPath loadShardPath(ESLogger logger, NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
+ final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
+ final Path[] paths = env.shardPaths(shardId);
+ Tuple metaDataTuple = null;
+ for (Path path : paths) {
+ ShardStateMetaData load = ShardStateMetaData.FORMAT.loadLatestState(logger, path);
+ if (load != null) {
+ if (metaDataTuple == null || metaDataTuple.v2().version < load.version) {
+ metaDataTuple = new Tuple<>(path, load);
+ if ((load.indexUUID.equals(indexUUID) || IndexMetaData.INDEX_UUID_NA_VALUE.equals(load.indexUUID)) == false) {
+ throw new ElasticsearchIllegalStateException("index UUID for shard [" + shardId + "] was: " + load.indexUUID + " excepted: " + indexUUID + " on shard path: " + path);
+ }
+ }
+ }
+ }
+ if (metaDataTuple == null) {
+ return null;
+ } else {
+ final Path dataPath;
+ final Path statePath = metaDataTuple.v1();
+ if (NodeEnvironment.hasCustomDataPath(indexSettings)) {
+ dataPath = env.resolveCustomLocation(indexSettings, shardId);
+ } else {
+ dataPath = statePath;
+ }
+ logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath);
+ return new ShardPath(dataPath, statePath, indexUUID, shardId);
+ }
+ }
+ // nocommit - we need something more extensible but this does the job for now...
+ public static ShardPath findShardPath(NodeEnvironment env, ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
+ final String indexUUID = indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
+ final Path[] paths = env.shardPaths(shardId);
+ final List> minUsedPaths = new ArrayList<>();
+ for (Path shardPath : paths) {
+ Path path = shardPath;
+ while (Files.exists(path) == false) {
+ path = path.getParent();
+ }
+ FileStore fileStore = Files.getFileStore(path);
+ long usableSpace = fileStore.getUsableSpace();
+ if (minUsedPaths.isEmpty() || minUsedPaths.get(0).v2() == usableSpace) {
+ minUsedPaths.add(new Tuple<>(shardPath, usableSpace));
+ } else if (minUsedPaths.get(0).v2() < usableSpace) {
+ minUsedPaths.clear();
+ minUsedPaths.add(new Tuple<>(shardPath, usableSpace));
+ }
+ }
+ Path minUsed = minUsedPaths.get(shardId.id() % minUsedPaths.size()).v1();
+ final Path dataPath;
+ final Path statePath = minUsed;
+ if (NodeEnvironment.hasCustomDataPath(indexSettings)) {
+ dataPath = env.resolveCustomLocation(indexSettings, shardId);
+ } else {
+ dataPath = statePath;
+ }
+ return new ShardPath(dataPath, statePath, indexUUID, shardId);
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ShardPath shardPath = (ShardPath) o;
+ if (shardId != null ? !shardId.equals(shardPath.shardId) : shardPath.shardId != null) return false;
+ if (indexUUID != null ? !indexUUID.equals(shardPath.indexUUID) : shardPath.indexUUID != null) return false;
+ if (path != null ? !path.equals(shardPath.path) : shardPath.path != null) return false;
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = path != null ? path.hashCode() : 0;
+ result = 31 * result + (indexUUID != null ? indexUUID.hashCode() : 0);
+ result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
+ return result;
+ }
+ @Override
+ public String toString() {
+ return "ShardPath{" +
+ "path=" + path +
+ ", indexUUID='" + indexUUID + '\'' +
+ ", shard=" + shardId +
+ '}';
+ }
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
@@ -142,5 +143,4 @@ public ShardStateMetaData fromXContent(XContentParser parser) throws IOException
return new ShardStateMetaData(version, primary, indexUUID);
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FilterDirectory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.distributor.Distributor;
import java.io.IOException;
@@ -37,25 +35,7 @@ protected DirectoryService(ShardId shardId, @IndexSettings Settings indexSetting
super(shardId, indexSettings);
- public abstract Directory[] build() throws IOException;
public abstract long throttleTimeInNanos();
- /**
- * Creates a new Directory from the given distributor.
- * The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory}
- * if there is more than one data path in the distributor.
- */
- public Directory newFromDistributor(final Distributor distributor) throws IOException {
- if (distributor.all().length == 1) {
- // use filter dir for consistent toString methods
- return new FilterDirectory(distributor.primary()) {
- @Override
- public String toString() {
- return distributor.toString();
- }
- };
- }
- return new DistributorDirectory(distributor);
- }
+ public abstract Directory newDirectory() throws IOException;
\ No newline at end of file
Class extends DirectoryService> shardDirectory();
- /**
- * Return an array of all index folder locations for a given shard
- */
- Path[] shardIndexLocations(ShardId shardId);
- /**
- * Return an array of all translog folder locations for a given shard
- */
- Path[] shardTranslogLocations(ShardId shardId);
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.distributor.Distributor;
import java.io.*;
import java.nio.file.NoSuchFileException;
@@ -106,18 +105,17 @@ protected void closeInternal() {
- public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock) throws IOException {
- this(shardId, indexSettings, directoryService, distributor, shardLock, OnClose.EMPTY);
+ public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, ShardLock shardLock) throws IOException {
+ this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
- public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock, OnClose onClose) throws IOException {
+ public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException {
super(shardId, indexSettings);
- this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
+ this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;
final TimeValue refreshInterval = indexSettings.getAsTime(INDEX_STORE_STATS_REFRESH_INTERVAL, TimeValue.timeValueSeconds(10));
this.statsCache = new StoreStatsCache(refreshInterval, directory, directoryService);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
@@ -359,21 +357,14 @@ private void closeInternal() {
* @throws IOException if the index we try to read is corrupted
- public static MetadataSnapshot readMetadataSnapshot(Path[] indexLocations, ESLogger logger) throws IOException {
- final Directory[] dirs = new Directory[indexLocations.length];
- try {
- for (int i = 0; i < indexLocations.length; i++) {
- dirs[i] = new SimpleFSDirectory(indexLocations[i]);
- }
- DistributorDirectory dir = new DistributorDirectory(dirs);
+ public static MetadataSnapshot readMetadataSnapshot(Path indexLocations, ESLogger logger) throws IOException {
+ try (Directory dir = new SimpleFSDirectory(indexLocations)){
failIfCorrupted(dir, new ShardId("", 1));
return new MetadataSnapshot(null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
- } finally {
- IOUtils.close(dirs);
return MetadataSnapshot.EMPTY;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.Callback;
import org.elasticsearch.env.ShardLock;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.distributor.Distributor;
-import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
-import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
+import org.elasticsearch.index.shard.ShardPath;
@@ -39,48 +35,26 @@ public class StoreModule extends AbstractModule {
private final Settings settings;
- private final IndexStore indexStore;
private final ShardLock lock;
private final Store.OnClose closeCallback;
+ private final ShardPath path;
+ private final Class extends DirectoryService> shardDirectory;
- private Class extends Distributor> distributor;
- public StoreModule(Settings settings, IndexStore indexStore, ShardLock lock, Store.OnClose closeCallback) {
- this.indexStore = indexStore;
+ public StoreModule(Settings settings, Class extends DirectoryService> shardDirectory, ShardLock lock, Store.OnClose closeCallback, ShardPath path) {
+ this.shardDirectory = shardDirectory;
this.settings = settings;
this.lock = lock;
this.closeCallback = closeCallback;
- }
- public void setDistributor(Class extends Distributor> distributor) {
- this.distributor = distributor;
+ this.path = path;
protected void configure() {
- bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton();
+ bind(DirectoryService.class).to(shardDirectory).asEagerSingleton();
- if (distributor == null) {
- distributor = loadDistributor(settings);
- }
- bind(Distributor.class).to(distributor).asEagerSingleton();
+ bind(ShardPath.class).toInstance(path);
- private Class extends Distributor> loadDistributor(Settings settings) {
- final Class extends Distributor> distributor;
- final String type = settings.get(DISTIBUTOR_KEY);
- if ("least_used".equals(type)) {
- distributor = LeastUsedDistributor.class;
- } else if ("random".equals(type)) {
- distributor = RandomWeightedDistributor.class;
- } else {
- distributor = settings.getAsClass(DISTIBUTOR_KEY, LeastUsedDistributor.class,
- "org.elasticsearch.index.store.distributor.", "Distributor");
- }
- return distributor;
- }
+ public DefaultFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ShardPath shardPath) {
+ super(shardId, indexSettings, indexStore, shardPath);
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.StoreException;
@@ -39,9 +40,11 @@ public abstract class FsDirectoryService extends DirectoryService implements Sto
protected final IndexStore indexStore;
private final CounterMetric rateLimitingTimeInNanos = new CounterMetric();
+ private final ShardPath path;
- public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
+ public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ShardPath path) {
super(shardId, indexSettings);
+ this.path = path;
this.indexStore = indexStore;
@@ -68,19 +71,14 @@ protected final LockFactory buildLockFactory() throws IOException {
return lockFactory;
- public Directory[] build() throws IOException {
- Path[] locations = indexStore.shardIndexLocations(shardId);
- Directory[] dirs = new Directory[locations.length];
- for (int i = 0; i < dirs.length; i++) {
- Files.createDirectories(locations[i]);
- Directory wrapped = newFSDirectory(locations[i], buildLockFactory());
- dirs[i] = new RateLimitedFSDirectory(wrapped, this, this) ;
- }
- return dirs;
+ public Directory newDirectory() throws IOException {
+ Path locations = path.resolveIndex();
+ Files.createDirectories(locations);
+ Directory wrapped = newFSDirectory(locations, buildLockFactory());
+ return new RateLimitedFSDirectory(wrapped, this, this) ;
protected abstract Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
@@ -37,8 +38,8 @@
public class MmapFsDirectoryService extends FsDirectoryService {
- public MmapFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
- super(shardId, indexSettings, indexStore);
+ public MmapFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ShardPath shardPath) {
+ super(shardId, indexSettings, indexStore, shardPath);
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
@@ -37,8 +38,8 @@
public class NioFsDirectoryService extends FsDirectoryService {
- public NioFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
- super(shardId, indexSettings, indexStore);
+ public NioFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ShardPath shardPath) {
+ super(shardId, indexSettings, indexStore, shardPath);
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import java.io.File;
@@ -37,8 +38,8 @@
public class SimpleFsDirectoryService extends FsDirectoryService {
- public SimpleFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
- super(shardId, indexSettings, indexStore);
+ public SimpleFsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ShardPath shardPath) {
+ super(shardId, indexSettings, indexStore, shardPath);
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
- public static final String INDEX_FOLDER_NAME = "index";
- public static final String TRANSLOG_FOLDER_NAME = "translog";
class ApplySettings implements IndexSettingsService.Listener {
public void onRefreshSettings(Settings settings) {
@@ -114,6 +111,7 @@ protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings,
} else {
this.locations = null;
@@ -125,36 +123,4 @@ public void close() throws ElasticsearchException {
public StoreRateLimiting rateLimiting() {
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
- /**
- * Return an array of all index folder locations for a given shard. Uses
- * the index settings to determine if a custom data path is set for the
- * index and uses that if applicable.
- */
- @Override
- public Path[] shardIndexLocations(ShardId shardId) {
- Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
- Path[] locations = new Path[shardLocations.length];
- for (int i = 0; i < shardLocations.length; i++) {
- locations[i] = shardLocations[i].resolve(INDEX_FOLDER_NAME);
- }
- logger.debug("using [{}] as shard's index location", locations);
- return locations;
- }
- /**
- * Return an array of all translog folder locations for a given shard. Uses
- * the index settings to determine if a custom data path is set for the
- * index and uses that if applicable.
- */
- @Override
- public Path[] shardTranslogLocations(ShardId shardId) {
- Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
- Path[] locations = new Path[shardLocations.length];
- for (int i = 0; i < shardLocations.length; i++) {
- locations[i] = shardLocations[i].resolve(TRANSLOG_FOLDER_NAME);
- }
- logger.debug("using [{}] as shard's translog location", locations);
- return locations;
- }
* These paths don't contain actual translog files they are
* directories holding the transaction logs.
- public Path[] locations();
+ public Path location();
* Returns the translog file with the given id as a Path. This
diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
index 6044cc88758f9..30d353e1fbacb 100644
--- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
+++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
@@ -35,6 +35,7 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
@@ -46,7 +47,6 @@
import java.nio.file.*;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
@@ -76,7 +76,7 @@ public void onRefreshSettings(Settings settings) {
private final BigArrays bigArrays;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- private final Path[] locations;
+ private final Path location;
private volatile FsTranslogFile current;
private volatile FsTranslogFile trans;
@@ -94,26 +94,22 @@ public void onRefreshSettings(Settings settings) {
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
- BigArrays bigArrays, IndexStore indexStore) throws IOException {
+ BigArrays bigArrays, ShardPath shardPath) throws IOException {
super(shardId, indexSettings);
- }
+ this.location = shardPath.resolveTranslog();
+ Files.createDirectories(location);
this.type = FsTranslogFile.Type.fromString(indexSettings.get("index.translog.fs.type", FsTranslogFile.Type.BUFFERED.name()));
this.bufferSize = (int) indexSettings.getAsBytesSize("index.translog.fs.buffer_size", ByteSizeValue.parseBytesSizeValue("64k")).bytes(); // Not really interesting, updated by IndexingMemoryController...
this.transientBufferSize = (int) indexSettings.getAsBytesSize("index.translog.fs.transient_buffer_size", ByteSizeValue.parseBytesSizeValue("8k")).bytes();
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, Path location) throws IOException {
super(shardId, indexSettings);
this.indexSettingsService = null;
- this.locations = new Path[]{location};
+ this.location = location;
this.bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
@@ -153,8 +149,8 @@ public void close() throws IOException {
- public Path[] locations() {
- return locations;
+ public Path location() {
+ return location;
@@ -198,19 +194,15 @@ public long translogSizeInBytes() {
public int clearUnreferenced() {
int deleted = 0;
- try {
- for (Path location : locations) {
- try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
- for (Path file : stream) {
- if (isReferencedTranslogFile(file) == false) {
- try {
- logger.trace("delete unreferenced translog file: " + file);
- Files.delete(file);
- deleted++;
- } catch (Exception ex) {
- logger.debug("failed to delete " + file, ex);
- }
- }
+ try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
+ for (Path file : stream) {
+ if (isReferencedTranslogFile(file) == false) {
+ try {
+ logger.trace("delete unreferenced translog file: " + file);
+ Files.delete(file);
+ deleted++;
+ } catch (Exception ex) {
+ logger.debug("failed to delete " + file, ex);
@@ -227,17 +219,6 @@ public void newTranslog(long id) throws TranslogException, IOException {
try {
FsTranslogFile newFile;
- long size = Long.MAX_VALUE;
- Path location = null;
- for (Path file : locations) {
- long currentFree = Files.getFileStore(file).getUsableSpace();
- if (currentFree < size) {
- size = currentFree;
- location = file;
- } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) {
- location = file;
- }
- }
try {
newFile = type.create(shardId, id, new InternalChannelReference(location.resolve(getPath(id)), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW), bufferSize);
} catch (IOException e) {
@@ -256,17 +237,6 @@ public void newTransientTranslog(long id) throws TranslogException {
try {
assert this.trans == null;
- long size = Long.MAX_VALUE;
- Path location = null;
- for (Path file : locations) {
- long currentFree = Files.getFileStore(file).getUsableSpace();
- if (currentFree < size) {
- size = currentFree;
- location = file;
- } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) {
- location = file;
- }
- }
this.trans = type.create(shardId, id, new InternalChannelReference(location.resolve(getPath(id)), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW), transientBufferSize);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
@@ -452,18 +422,16 @@ public long findLargestPresentTranslogId() throws IOException {
try {
long maxId = this.currentId();
- for (Path location : locations()) {
- try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
- for (Path translogFile : stream) {
- try {
- final String fileName = translogFile.getFileName().toString();
- final Matcher matcher = PARSE_ID_PATTERN.matcher(fileName);
- if (matcher.matches()) {
- maxId = Math.max(maxId, Long.parseLong(matcher.group(1)));
- }
- } catch (NumberFormatException ex) {
- logger.warn("Couldn't parse translog id from file " + translogFile + " skipping");
+ try (DirectoryStream stream = Files.newDirectoryStream(location, TRANSLOG_FILE_PREFIX + "[0-9]*")) {
+ for (Path translogFile : stream) {
+ try {
+ final String fileName = translogFile.getFileName().toString();
+ final Matcher matcher = PARSE_ID_PATTERN.matcher(fileName);
+ if (matcher.matches()) {
+ maxId = Math.max(maxId, Long.parseLong(matcher.group(1)));
+ } catch (NumberFormatException ex) {
+ logger.warn("Couldn't parse translog id from file " + translogFile + " skipping");
import java.io.Closeable;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.CountDownLatch;
@@ -587,8 +588,11 @@ private boolean canDeleteShardContent(ShardId shardId, @IndexSettings Settings i
final IndexService indexService = indexServiceInjectorTuple.v1();
return indexService.hasShard(shardId.id()) == false;
} else if (nodeEnv.hasNodeFile()) {
- final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
- return FileSystemUtils.exists(shardLocations);
+ if (NodeEnvironment.hasCustomDataPath(indexSettings)) {
+ return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId));
+ } else {
+ return FileSystemUtils.exists(nodeEnv.shardPaths(shardId));
+ }
} else {
logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId);
public InternalIndicesLifecycle(Settings settings) {
public void addListener(Listener listener) {
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@@ -43,6 +42,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
@@ -50,7 +50,6 @@
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -173,16 +172,11 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.of());
- Path[] shardLocations = nodeEnv.shardDataPaths(shardId, metaData.settings());
- Path[] shardIndexLocations = new Path[shardLocations.length];
- for (int i = 0; i < shardLocations.length; i++) {
- shardIndexLocations[i] = shardLocations[i].resolve("index");
- }
- exists = FileSystemUtils.exists(shardIndexLocations);
- if (!exists) {
+ final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings());
+ if (shardPath == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.of());
- return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardIndexLocations, logger).asMap());
+ return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), logger).asMap());
} finally {
TimeValue took = new TimeValue(System.currentTimeMillis() - startTime);
if (exists) {
@@ -220,14 +214,6 @@ public ShardId shardId() {
return this.shardId;
- public long totalSizeInBytes() {
- long totalSizeInBytes = 0;
- for (StoreFileMetaData file : this) {
- totalSizeInBytes += file.length();
- }
- return totalSizeInBytes;
- }
public Iterator iterator() {
return files.values().iterator();
diff --git a/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java b/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java
index 3b0c1e99f3720..edc9e699366a7 100644
--- a/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java
+++ b/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteTests.java
@@ -36,6 +36,7 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
diff --git a/src/test/java/org/elasticsearch/common/util/MultiDataPathUpgraderTests.java b/src/test/java/org/elasticsearch/common/util/MultiDataPathUpgraderTests.java
new file mode 100644
index 0000000000000..1c9c74a02e8dd
--- /dev/null
+++ b/src/test/java/org/elasticsearch/common/util/MultiDataPathUpgraderTests.java
@@ -0,0 +1,298 @@
+package org.elasticsearch.common.util;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import org.apache.lucene.util.TestUtil;
+import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityTests;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.common.io.FileSystemUtils;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.gateway.MetaDataStateFormat;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.index.shard.ShardStateMetaData;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.*;
+import java.util.*;
+ */
+public class MultiDataPathUpgraderTests extends ElasticsearchTestCase {
+ public void testUpgradeRandomPaths() throws IOException {
+ try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
+ final String uuid = Strings.base64UUID();
+ final Path[] shardDataPaths = nodeEnvironment.shardPaths(new ShardId("foo", 0));
+ int numIdxFiles = 0;
+ int numTranslogFiles = 0;
+ int metaStateVersion = 0;
+ for (Path shardPath : shardDataPaths) {
+ final Path translog = shardPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
+ final Path idx = shardPath.resolve(ShardPath.INDEX_FOLDER_NAME);
+ Files.createDirectories(translog);
+ Files.createDirectories(idx);
+ int numFiles = randomIntBetween(1, 10);
+ for (int i = 0; i < numFiles; i++, numIdxFiles++) {
+ String filename = Integer.toString(numIdxFiles);
+ try (BufferedWriter w = Files.newBufferedWriter(idx.resolve(filename + ".tst"), Charsets.UTF_8)) {
+ w.write(filename);
+ }
+ }
+ numFiles = randomIntBetween(1, 10);
+ for (int i = 0; i < numFiles; i++, numTranslogFiles++) {
+ String filename = Integer.toString(numTranslogFiles);
+ try (BufferedWriter w = Files.newBufferedWriter(translog.resolve(filename + ".translog"), Charsets.UTF_8)) {
+ w.write(filename);
+ }
+ }
+ ++metaStateVersion;
+ ShardStateMetaData.FORMAT.write(new ShardStateMetaData(metaStateVersion, true, uuid), metaStateVersion, shardDataPaths);
+ }
+ final Path path = randomFrom(shardDataPaths);
+ ShardPath targetPath = new ShardPath(path, path, uuid, new ShardId("foo", 0));
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
+ helper.upgrade(new ShardId("foo", 0), targetPath);
+ assertFalse(helper.needsUpgrading(new ShardId("test", 0), ImmutableSettings.EMPTY));
+ if (shardDataPaths.length > 1) {
+ for (Path shardPath : shardDataPaths) {
+ if (shardPath.equals(targetPath.getDataPath())) {
+ continue;
+ }
+ final Path translog = shardPath.resolve(ShardPath.TRANSLOG_FOLDER_NAME);
+ final Path idx = shardPath.resolve(ShardPath.INDEX_FOLDER_NAME);
+ final Path state = shardPath.resolve(MetaDataStateFormat.STATE_DIR_NAME);
+ assertEquals(0, FileSystemUtils.files(translog).length);
+ assertEquals(0, FileSystemUtils.files(idx).length);
+ assertFalse(Files.exists(state));
+ }
+ }
+ final ShardStateMetaData stateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, targetPath.getShardStatePath());
+ assertEquals(metaStateVersion, stateMetaData.version);
+ assertTrue(stateMetaData.primary);
+ assertEquals(uuid, stateMetaData.indexUUID);
+ final Path translog = targetPath.getDataPath().resolve(ShardPath.TRANSLOG_FOLDER_NAME);
+ final Path idx = targetPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
+ Files.deleteIfExists(idx.resolve("write.lock"));
+ assertEquals(numTranslogFiles, FileSystemUtils.files(translog).length);
+ assertEquals(numIdxFiles, FileSystemUtils.files(idx).length);
+ final HashSet translogFiles = Sets.newHashSet(FileSystemUtils.files(translog));
+ for (int i = 0; i < numTranslogFiles; i++) {
+ final String name = Integer.toString(i);
+ translogFiles.contains(translog.resolve(name + ".translog"));
+ byte[] content = Files.readAllBytes(translog.resolve(name + ".translog"));
+ assertEquals(name , new String(content, Charsets.UTF_8));
+ }
+ final HashSet idxFiles = Sets.newHashSet(FileSystemUtils.files(idx));
+ for (int i = 0; i < numIdxFiles; i++) {
+ final String name = Integer.toString(i);
+ idxFiles.contains(idx.resolve(name + ".tst"));
+ byte[] content = Files.readAllBytes(idx.resolve(name + ".tst"));
+ assertEquals(name , new String(content, Charsets.UTF_8));
+ }
+ }
+ }
+ /**
+ * Run upgrade on a real bwc index
+ */
+ public void testUpgradeRealIndex() throws IOException, URISyntaxException {
+ List indexes = new ArrayList<>();
+ URL dirUrl = OldIndexBackwardsCompatibilityTests.class.getResource(".");
+ Path dir = Paths.get(dirUrl.toURI());
+ try (DirectoryStream stream = Files.newDirectoryStream(dir, "index-*.zip")) {
+ for (Path path : stream) {
+ indexes.add(path);
+ }
+ }
+ final Path path = randomFrom(indexes);
+ final String indexName = path.getFileName().toString().replace(".zip", "");
+ try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
+ Path indexDir = newTempDirPath();
+ Path dataDir = indexDir.resolve("data").resolve("bwc_" + indexName.replace("-", "_")).resolve("nodes").resolve("0").resolve("indices");
+ final Path[] paths = nodeEnvironment.nodeDataPaths();
+ try (InputStream stream = Files.newInputStream(path)) {
+ TestUtil.unzip(stream, indexDir);
+ }
+ assertTrue(Files.exists(dataDir));
+ Path primary = paths[0].resolve("indices");
+ Files.createDirectories(primary.getParent());
+ Files.move(dataDir, primary);
+ assertFalse(Files.exists(dataDir));
+ assertTrue(Files.exists(primary));
+ Path primaryIndex = primary.resolve("test").resolve("0").resolve("index");
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
+ final ShardPath shardPath = new ShardPath(nodeEnvironment.shardPaths(new ShardId("test", 0))[0], nodeEnvironment.shardPaths(new ShardId("test", 0))[0], IndexMetaData.INDEX_UUID_NA_VALUE, new ShardId("test", 0));
+ helper.checkIndex(shardPath);
+ logger.info(Arrays.toString(FileSystemUtils.files(primaryIndex)));
+ boolean indexFileMoved = false;
+ for (int i = 1; i < paths.length; i++) {
+ final Path shardDir = paths[i].resolve("indices").resolve("test").resolve("0");
+ Files.createDirectories(shardDir);
+ Files.createDirectories(shardDir.resolve("index"));
+ Files.copy(primary.resolve("test").resolve("0").resolve("_state"), shardDir.resolve("_state"));
+ final Path[] files = FileSystemUtils.files(primaryIndex);
+ if (files.length > 1) {
+ int numFiles = scaledRandomIntBetween(1, files.length - 1);
+ for (int j = 0; j < numFiles; j++) {
+ final Path fileName = files[j].getFileName();
+ logger.info("move away {}", fileName);
+ indexFileMoved |= (fileName.toString().startsWith("_checksums") || fileName.toString().equals("write.lock")) == false;
+ Files.move(files[j], shardDir.resolve("index").resolve(fileName));
+ }
+ }
+ }
+ final Path translogPath = randomFrom(paths);
+ if (translogPath != primary) {
+ Files.move(primary.resolve("test").resolve("0").resolve("translog"), translogPath.resolve("indices").resolve("test").resolve("0").resolve("translog"));
+ }
+ if (indexFileMoved) {
+ try {
+ helper.checkIndex(new ShardPath(nodeEnvironment.shardPaths(new ShardId("test", 0))[0], nodeEnvironment.shardPaths(new ShardId("test", 0))[0], IndexMetaData.INDEX_UUID_NA_VALUE, new ShardId("test", 0)));
+ fail("check index should fail " + Arrays.toString(FileSystemUtils.files(primary.resolve("test").resolve("0").resolve("index"))));
+ } catch (Exception ex) {
+ // expected
+ }
+ } else {
+ helper.checkIndex(new ShardPath(nodeEnvironment.shardPaths(new ShardId("test", 0))[0], nodeEnvironment.shardPaths(new ShardId("test", 0))[0], IndexMetaData.INDEX_UUID_NA_VALUE, new ShardId("test", 0)));
+ }
+ helper.upgrade(new ShardId("test", 0), shardPath);
+ helper.checkIndex(new ShardPath(nodeEnvironment.shardPaths(new ShardId("test", 0))[0], nodeEnvironment.shardPaths(new ShardId("test", 0))[0], IndexMetaData.INDEX_UUID_NA_VALUE, new ShardId("test", 0)));
+ assertFalse(helper.needsUpgrading(new ShardId("test", 0), ImmutableSettings.EMPTY));
+ }
+ }
+ public void testNeedsUpgrade() throws IOException {
+ try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
+ String uuid = Strings.randomBase64UUID();
+ final ShardId shardId = new ShardId("foo", 0);
+ ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid), 1, nodeEnvironment.shardPaths(shardId));
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
+ boolean multiDataPaths = nodeEnvironment.nodeDataPaths().length > 1;
+ final ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+ boolean customPath = randomBoolean();
+ if (customPath) {
+ builder.put(IndexMetaData.SETTING_DATA_PATH, "/foo/bar");
+ }
+ boolean needsUpgrading = helper.needsUpgrading(shardId, builder.build());
+ if (multiDataPaths && customPath == false) {
+ assertTrue(needsUpgrading);
+ } else {
+ assertFalse(needsUpgrading);
+ }
+ }
+ }
+ public void testPickTargetShardPath() throws IOException {
+ try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
+ ShardId shard = new ShardId("foo", 0);
+ final Path[] paths = nodeEnvironment.shardPaths(shard);
+ if (paths.length == 1) {
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment);
+ try {
+ helper.pickShardPath(new ShardId("foo", 0), ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_UUID, Strings.randomBase64UUID()).build());
+ fail("one path needs no upgrading");
+ } catch (IllegalStateException ex) {
+ // only one path
+ }
+ } else {
+ final Map> pathToSpace = new HashMap<>();
+ final Path expectedPath;
+ if (randomBoolean()) { // path with most of the file bytes
+ expectedPath = randomFrom(paths);
+ long[] used = new long[paths.length];
+ long sumSpaceUsed = 0;
+ for (int i = 0; i < used.length; i++) {
+ long spaceUsed = paths[i] == expectedPath ? randomIntBetween(101, 200) : randomIntBetween(10, 100);
+ sumSpaceUsed += spaceUsed;
+ used[i] = spaceUsed;
+ }
+ for (int i = 0; i < used.length; i++) {
+ long availalbe = randomIntBetween((int)(2*sumSpaceUsed-used[i]), 4 * (int)sumSpaceUsed);
+ pathToSpace.put(paths[i], new Tuple<>(availalbe, used[i]));
+ }
+ } else { // path with largest available space
+ expectedPath = randomFrom(paths);
+ long[] used = new long[paths.length];
+ long sumSpaceUsed = 0;
+ for (int i = 0; i < used.length; i++) {
+ long spaceUsed = randomIntBetween(10, 100);
+ sumSpaceUsed += spaceUsed;
+ used[i] = spaceUsed;
+ }
+ for (int i = 0; i < used.length; i++) {
+ long availalbe = paths[i] == expectedPath ? randomIntBetween((int)(sumSpaceUsed), (int)(2*sumSpaceUsed)) : randomIntBetween(0, (int)(sumSpaceUsed) - 1) ;
+ pathToSpace.put(paths[i], new Tuple<>(availalbe, used[i]));
+ }
+ }
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment) {
+ @Override
+ protected long getUsabelSpace(Path path) throws IOException {
+ return pathToSpace.get(path).v1();
+ }
+ @Override
+ protected long getSpaceUsedByShard(Path path) throws IOException {
+ return pathToSpace.get(path).v2();
+ }
+ };
+ String uuid = Strings.randomBase64UUID();
+ ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, uuid), 1, paths);
+ final ShardPath shardPath = helper.pickShardPath(new ShardId("foo", 0), ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_UUID, uuid).build());
+ assertEquals(expectedPath, shardPath.getDataPath());
+ assertEquals(expectedPath, shardPath.getShardStatePath());
+ }
+ MultiDataPathUpgrader helper = new MultiDataPathUpgrader(nodeEnvironment) {
+ @Override
+ protected long getUsabelSpace(Path path) throws IOException {
+ return randomIntBetween(0, 10);
+ }
+ @Override
+ protected long getSpaceUsedByShard(Path path) throws IOException {
+ return randomIntBetween(11, 20);
+ }
+ };
+ try {
+ helper.pickShardPath(new ShardId("foo", 0), ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_UUID, Strings.randomBase64UUID()).build());
+ fail("not enough space");
+ } catch (IllegalStateException ex) {
+ // not enough space
+ }
+ }
+ }
diff --git a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java
index e8ce1488dc846..6f334a5c888cb 100644
--- a/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java
+++ b/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java
@@ -341,8 +341,10 @@ public void testCustomDataPaths() throws Exception {
assertFalse("no settings should mean no custom data path", NodeEnvironment.hasCustomDataPath(s1));
assertTrue("settings with path_data should have a custom data path", NodeEnvironment.hasCustomDataPath(s2));
- assertThat(env.shardDataPaths(sid, s1), equalTo(env.shardPaths(sid)));
- assertThat(env.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/0/myindex/0")}));
+ assertThat(env.shardPaths(sid), equalTo(env.shardPaths(sid)));
+ assertFalse(NodeEnvironment.hasCustomDataPath(s1));
+ assertThat(env.resolveCustomLocation(s2, sid), equalTo(Paths.get("/tmp/foo/0/myindex/0")));
+ assertTrue(NodeEnvironment.hasCustomDataPath(s2));
assertThat("shard paths with a custom data_path should contain only regular paths",
@@ -355,8 +357,8 @@ public void testCustomDataPaths() throws Exception {
NodeEnvironment env2 = newNodeEnvironment(dataPaths,
ImmutableSettings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build());
- assertThat(env2.shardDataPaths(sid, s1), equalTo(env2.shardPaths(sid)));
- assertThat(env2.shardDataPaths(sid, s2), equalTo(new Path[] {Paths.get("/tmp/foo/myindex/0")}));
+ assertThat(env2.shardPaths(sid), equalTo(env2.shardPaths(sid)));
+ assertThat(env2.resolveCustomLocation(s2, sid), equalTo(Paths.get("/tmp/foo/myindex/0")));
assertThat("shard paths with a custom data_path should contain only regular paths",
diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 89da15178487a..6877b555f54af 100644
--- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -69,7 +69,6 @@
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogSizeMatcher;
import org.elasticsearch.index.translog.fs.FsTranslog;
@@ -214,8 +213,8 @@ protected Store createStore() throws IOException {
protected Store createStore(final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
- public Directory[] build() throws IOException {
- return new Directory[]{directory};
+ public Directory newDirectory() throws IOException {
+ return directory;
@@ -223,7 +222,7 @@ public long throttleTimeInNanos() {
return 0;
- return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
+ return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId));
protected Translog createTranslog() throws IOException {
diff --git a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
index ab09e51764548..4ee2c42a325e2 100644
--- a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
+++ b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
@@ -27,7 +27,6 @@
import org.apache.lucene.index.*;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -41,7 +40,6 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
-import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ParseContext;
@@ -58,7 +56,6 @@
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock;
@@ -190,8 +187,8 @@ protected Store createStore(Path p) throws IOException {
protected Store createStore(final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
- public Directory[] build() throws IOException {
- return new Directory[]{ directory };
+ public Directory newDirectory() throws IOException {
+ return directory;
@@ -199,7 +196,7 @@ public long throttleTimeInNanos() {
return 0;
- return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
+ return new Store(shardId, EMPTY_SETTINGS, directoryService, new DummyShardLock(shardId));
protected Translog createTranslog() throws IOException {
diff --git a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java
index f3ac0e3ae24b4..a045f3bbb6e64 100644
--- a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java
+++ b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java
@@ -33,7 +33,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
@@ -315,8 +314,8 @@ public Settings build(boolean value) {
protected Store createStore(Settings settings) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
- public Directory[] build() throws IOException {
- return new Directory[] { new RAMDirectory() } ;
+ public Directory newDirectory() throws IOException {
+ return new RAMDirectory() ;
@@ -324,7 +323,7 @@ public long throttleTimeInNanos() {
return 0;
- return new Store(shardId, settings, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
+ return new Store(shardId, settings, directoryService, new DummyShardLock(shardId));
diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index 1b12e39bcb38a..c3b9ad816003e 100644
--- a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndicesService;
diff --git a/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java b/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java
new file mode 100644
index 0000000000000..6bbd70e36ac44
--- /dev/null
+++ b/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java
@@ -0,0 +1,56 @@
+package org.elasticsearch.index.shard;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import java.io.IOException;
+import java.nio.file.Path;
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+ */
+public class ShardPathTests extends ElasticsearchTestCase {
+ public void testLoadShardPath() throws IOException {
+ try (final NodeEnvironment env = newNodeEnvironment(settingsBuilder().build())) {
+ ImmutableSettings.Builder builder = settingsBuilder().put(IndexMetaData.SETTING_UUID, "0xDEADBEEF");
+ Settings settings = builder.build();
+ ShardId shardId = new ShardId("foo", 0);
+ Path[] paths = env.shardPaths(shardId);
+ Path path = randomFrom(paths);
+ if (randomBoolean()) {
+ ShardStateMetaData.FORMAT.write(new ShardStateMetaData(1, true, "0xDEADBEEF"), 1, paths);
+ }
+ ShardStateMetaData.FORMAT.write(new ShardStateMetaData(2, true, "0xDEADBEEF"), 2, path);
+ ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, settings);
+ assertEquals(path, shardPath.getDataPath());
+ assertEquals("0xDEADBEEF", shardPath.getIndexUUID());
+ assertEquals("foo", shardPath.getShardId().getIndex());
+ assertEquals(path.resolve("translog"), shardPath.resolveTranslog());
+ assertEquals(path.resolve("index"), shardPath.resolveIndex());
+ }
+ }
diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
index e955baa6ad2ff..fb9cebe5d6ca8 100644
--- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
+++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java
@@ -528,11 +528,13 @@ private ShardRouting corruptRandomFile(final boolean includePerCommitFiles) thro
String path = info.getPath();
final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/index";
Path file = Paths.get(path).resolve(relativeDataLocationPath);
- try (DirectoryStream stream = Files.newDirectoryStream(file)) {
- for (Path item : stream) {
- if (Files.isRegularFile(item) && "write.lock".equals(item.getFileName().toString()) == false) {
- if (includePerCommitFiles || isPerSegmentFile(item.getFileName().toString())) {
- files.add(item);
+ if (Files.exists(file)) {
+ try (DirectoryStream stream = Files.newDirectoryStream(file)) {
+ for (Path item : stream) {
+ if (Files.isRegularFile(item) && "write.lock".equals(item.getFileName().toString()) == false) {
+ if (includePerCommitFiles || isPerSegmentFile(item.getFileName().toString())) {
+ files.add(item);
+ }
@@ -637,9 +639,11 @@ public List listShardFiles(ShardRouting routing) throws IOException {
for (FsStats.Info info : nodeStatses.getNodes()[0].getFs()) {
String path = info.getPath();
Path file = Paths.get(path).resolve("indices/test/" + Integer.toString(routing.getId()) + "/index");
- try (DirectoryStream stream = Files.newDirectoryStream(file)) {
- for (Path item : stream) {
- files.add(item);
+ if (Files.exists(file)) {
+ try (DirectoryStream stream = Files.newDirectoryStream(file)) {
+ for (Path item : stream) {
+ files.add(item);
+ }
diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java
index d9250d36e398a..976942c3d7f2c 100644
--- a/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java
+++ b/src/test/java/org/elasticsearch/index/store/CorruptedTranslogTests.java
@@ -126,14 +126,16 @@ private void corruptRandomTranslogFiles() throws IOException {
String path = info.getPath();
final String relativeDataLocationPath = "indices/test/" + Integer.toString(shardRouting.getId()) + "/translog";
Path file = Paths.get(path).resolve(relativeDataLocationPath);
- logger.info("--> path: {}", file);
- try (DirectoryStream stream = Files.newDirectoryStream(file)) {
- for (Path item : stream) {
- logger.info("--> File: {}", item);
- if (Files.isRegularFile(item) && item.getFileName().toString().startsWith("translog-")) {
- files.add(item);
- }
+ if (Files.exists(file)) {
+ logger.info("--> path: {}", file);
+ try (DirectoryStream stream = Files.newDirectoryStream(file)) {
+ for (Path item : stream) {
+ logger.info("--> File: {}", item);
+ if (Files.isRegularFile(item) && item.getFileName().toString().startsWith("translog-")) {
+ files.add(item);
+ }
+ }
diff --git a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java b/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java
deleted file mode 100644
index c00e275cc1761..0000000000000
--- a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-package org.elasticsearch.index.store;
-import com.carrotsearch.randomizedtesting.annotations.Listeners;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
-import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
-import com.carrotsearch.randomizedtesting.generators.RandomInts;
-import com.carrotsearch.randomizedtesting.generators.RandomPicks;
-import com.google.common.collect.ImmutableSet;
-import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.store.BaseDirectoryTestCase;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FilterDirectory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.TimeUnits;
-import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.index.store.distributor.Distributor;
-import org.elasticsearch.test.junit.listeners.LoggingListener;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-@ThreadLeakLingering(linger = 5000) // 5 sec lingering
-@TimeoutSuite(millis = 5 * TimeUnits.MINUTE)
-@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
-public class DistributorDirectoryTest extends BaseDirectoryTestCase {
- protected final ESLogger logger = Loggers.getLogger(getClass());
- @Override
- protected Directory getDirectory(Path path) throws IOException {
- Directory[] directories = new Directory[1 + random().nextInt(5)];
- for (int i = 0; i < directories.length; i++) {
- directories[i] = newDirectory();
- if (directories[i] instanceof MockDirectoryWrapper) {
- // TODO: fix this test to handle virus checker
- ((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false);
- }
- }
- return new DistributorDirectory(directories);
- }
- // #7306: don't invoke the distributor when we are opening an already existing file
- public void testDoNotCallDistributorOnRead() throws Exception {
- Directory dir = newDirectory();
- dir.createOutput("one.txt", IOContext.DEFAULT).close();
- final Directory[] dirs = new Directory[] {dir};
- Distributor distrib = new Distributor() {
- @Override
- public Directory primary() {
- return dirs[0];
- }
- @Override
- public Directory[] all() {
- return dirs;
- }
- @Override
- public synchronized Directory any() {
- throw new IllegalStateException("any should not be called");
- }
- };
- DistributorDirectory dd = new DistributorDirectory(distrib);
- assertEquals(0, dd.fileLength("one.txt"));
- dd.openInput("one.txt", IOContext.DEFAULT).close();
- try {
- dd.createOutput("three.txt", IOContext.DEFAULT).close();
- fail("didn't hit expected exception");
- } catch (IllegalStateException ise) {
- // expected
- }
- dd.close();
- }
- public void testRenameFiles() throws IOException {
- final int iters = 1 + random().nextInt(10);
- for (int i = 0; i < iters; i++) {
- Directory[] dirs = new Directory[1 + random().nextInt(5)];
- for (int j=0; j < dirs.length; j++) {
- MockDirectoryWrapper directory = newMockDirectory();
- directory.setEnableVirusScanner(false);
- directory.setCheckIndexOnClose(false);
- dirs[j] = directory;
- }
- DistributorDirectory dd = new DistributorDirectory(dirs);
- String file = RandomPicks.randomFrom(random(), Arrays.asList(Store.CHECKSUMS_PREFIX, IndexFileNames.OLD_SEGMENTS_GEN, IndexFileNames.SEGMENTS, IndexFileNames.PENDING_SEGMENTS));
- String tmpFileName = RandomPicks.randomFrom(random(), Arrays.asList("recovery.", "foobar.", "test.")) + Math.max(0, Math.abs(random().nextLong())) + "." + file;
- try (IndexOutput out = dd.createOutput(tmpFileName, IOContext.DEFAULT)) {
- out.writeInt(1);
- }
- Directory theDir = null;
- for (Directory d : dirs) {
- try {
- if (d.fileLength(tmpFileName) > 0) {
- theDir = d;
- break;
- }
- } catch (IOException ex) {
- // nevermind
- }
- }
- assertNotNull("file must be in at least one dir", theDir);
- dd.renameFile(tmpFileName, file);
- try {
- dd.fileLength(tmpFileName);
- fail("file ["+tmpFileName + "] was renamed but still exists");
- } catch (FileNotFoundException | NoSuchFileException ex) {
- // all is well
- }
- try {
- theDir.fileLength(tmpFileName);
- fail("file ["+tmpFileName + "] was renamed but still exists");
- } catch (FileNotFoundException | NoSuchFileException ex) {
- // all is well
- }
- assertEquals(theDir.fileLength(file), 4);
- try (IndexOutput out = dd.createOutput("foo.bar", IOContext.DEFAULT)) {
- out.writeInt(1);
- }
- assertNotNull(dd);
- if (dd.getDirectory("foo.bar") != dd.getDirectory(file)) {
- try {
- dd.renameFile("foo.bar", file);
- fail("target file already exists in a different directory");
- } catch (IOException ex) {
- // target file already exists
- }
- }
- IOUtils.close(dd);
- }
- }
- public void testSync() throws IOException {
- final Set syncedFiles = new HashSet<>();
- final Directory[] directories = new Directory[RandomInts.randomIntBetween(random(), 1, 5)];
- for (int i = 0; i < directories.length; ++i) {
- final Directory dir = newDirectory();
- directories[i] = new FilterDirectory(dir) {
- @Override
- public void sync(Collection names) throws IOException {
- super.sync(names);
- syncedFiles.addAll(names);
- }
- };
- }
- final Directory directory = new DistributorDirectory(directories);
- for (String file : Arrays.asList("a.bin", "b.bin")) {
- try (IndexOutput out = directory.createOutput(file, IOContext.DEFAULT)) {
- out.writeInt(random().nextInt());
- }
- }
- // syncing on a missing file throws an exception
- try {
- directory.sync(Arrays.asList("a.bin", "c.bin"));
- } catch (FileNotFoundException e) {
- // expected
- }
- assertEquals(ImmutableSet.of(), syncedFiles);
- // but syncing on existing files actually delegates
- directory.sync(Arrays.asList("a.bin", "b.bin"));
- assertEquals(ImmutableSet.of("a.bin", "b.bin"), syncedFiles);
- directory.close();
- }
diff --git a/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java b/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java
deleted file mode 100644
index 6ccc3cd8696c7..0000000000000
--- a/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package org.elasticsearch.index.store;
-import com.carrotsearch.randomizedtesting.annotations.Listeners;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.util.LuceneTestCase;
-import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.index.store.distributor.Distributor;
-import org.elasticsearch.test.junit.listeners.LoggingListener;
-import org.junit.Before;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
- * This test is a copy of TestNRTThreads from lucene that puts some
- * hard concurrent pressure on the directory etc. to ensure DistributorDirectory is behaving ok.
- */
-@LuceneTestCase.SuppressCodecs({ "SimpleText", "Memory", "Direct" })
-@ThreadLeakLingering(linger = 5000) // 5 sec lingering
-@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
-public class DistributorInTheWildTest extends ThreadedIndexingAndSearchingTestCase {
- protected final ESLogger logger = Loggers.getLogger(getClass());
- private boolean useNonNrtReaders = true;
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- useNonNrtReaders = random().nextBoolean();
- }
- @Override
- protected void doSearching(ExecutorService es, long stopTime) throws Exception {
- boolean anyOpenDelFiles = false;
- DirectoryReader r = DirectoryReader.open(writer, true);
- while (System.currentTimeMillis() < stopTime && !failed.get()) {
- if (random().nextBoolean()) {
- if (VERBOSE) {
- logger.info("TEST: now reopen r=" + r);
- }
- final DirectoryReader r2 = DirectoryReader.openIfChanged(r);
- if (r2 != null) {
- r.close();
- r = r2;
- }
- } else {
- if (VERBOSE) {
- logger.info("TEST: now close reader=" + r);
- }
- r.close();
- writer.commit();
- final Set openDeletedFiles = getOpenDeletedFiles(dir);
- if (openDeletedFiles.size() > 0) {
- logger.info("OBD files: " + openDeletedFiles);
- }
- anyOpenDelFiles |= openDeletedFiles.size() > 0;
- //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
- if (VERBOSE) {
- logger.info("TEST: now open");
- }
- r = DirectoryReader.open(writer, true);
- }
- if (VERBOSE) {
- logger.info("TEST: got new reader=" + r);
- }
- //logger.info("numDocs=" + r.numDocs() + "
- //openDelFileCount=" + dir.openDeleteFileCount());
- if (r.numDocs() > 0) {
- fixedSearcher = new IndexSearcher(r, es);
- smokeTestSearcher(fixedSearcher);
- runSearchThreads(System.currentTimeMillis() + 500);
- }
- }
- r.close();
- //logger.info("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
- final Set openDeletedFiles = getOpenDeletedFiles(dir);
- if (openDeletedFiles.size() > 0) {
- logger.info("OBD files: " + openDeletedFiles);
- }
- anyOpenDelFiles |= openDeletedFiles.size() > 0;
- assertFalse("saw non-zero open-but-deleted count", anyOpenDelFiles);
- }
- private Set getOpenDeletedFiles(Directory dir) throws IOException {
- if (random().nextBoolean() && dir instanceof MockDirectoryWrapper) {
- return ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
- }
- DistributorDirectory d = DirectoryUtils.getLeaf(dir, DistributorDirectory.class, null);
- Distributor distributor = d.getDistributor();
- Set set = new HashSet<>();
- for (Directory subDir : distributor.all()) {
- Set openDeletedFiles = ((MockDirectoryWrapper) subDir).getOpenDeletedFiles();
- set.addAll(openDeletedFiles);
- }
- return set;
- }
- @Override
- protected Directory getDirectory(Directory in) {
- assert in instanceof MockDirectoryWrapper;
- if (!useNonNrtReaders) ((MockDirectoryWrapper) in).setAssertNoDeleteOpenFile(true);
- Directory[] directories = new Directory[1 + random().nextInt(5)];
- directories[0] = in;
- for (int i = 1; i < directories.length; i++) {
- final Path tempDir = createTempDir(getTestName());
- directories[i] = newMockFSDirectory(tempDir); // some subclasses rely on this being MDW
- if (!useNonNrtReaders) ((MockDirectoryWrapper) directories[i]).setAssertNoDeleteOpenFile(true);
- }
- for (Directory dir : directories) {
- ((MockDirectoryWrapper) dir).setCheckIndexOnClose(false);
- }
- try {
- if (random().nextBoolean()) {
- return new MockDirectoryWrapper(random(), new DistributorDirectory(directories));
- } else {
- return new DistributorDirectory(directories);
- }
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
- @Override
- protected void doAfterWriter(ExecutorService es) throws Exception {
- // Force writer to do reader pooling, always, so that
- // all merged segments, even for merges before
- // doSearching is called, are warmed:
- DirectoryReader.open(writer, true).close();
- }
- private IndexSearcher fixedSearcher;
- @Override
- protected IndexSearcher getCurrentSearcher() throws Exception {
- return fixedSearcher;
- }
- @Override
- protected void releaseSearcher(IndexSearcher s) throws Exception {
- if (s != fixedSearcher) {
- // Final searcher:
- s.getIndexReader().close();
- }
- }
- @Override
- protected IndexSearcher getFinalSearcher() throws Exception {
- final IndexReader r2;
- if (useNonNrtReaders) {
- if (random().nextBoolean()) {
- r2 = DirectoryReader.open(writer, true);
- } else {
- writer.commit();
- r2 = DirectoryReader.open(dir);
- }
- } else {
- r2 = DirectoryReader.open(writer, true);
- }
- return newSearcher(r2);
- }
- public void testNRTThreads() throws Exception {
- runTest("TestNRTThreads");
- }
diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java
index 01dc0a098fd41..cebd009936f17 100644
--- a/src/test/java/org/elasticsearch/index/store/StoreTest.java
+++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java
@@ -38,9 +38,6 @@
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.distributor.Distributor;
-import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
-import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.hamcrest.Matchers;
@@ -63,7 +60,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public void testRefCount() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
int incs = randomIntBetween(1, 100);
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
@@ -234,7 +231,7 @@ public void write(Directory dir, SegmentInfo si, IOContext ioContext) throws IOE
public void testWriteLegacyChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
// set default codec - all segments need checksums
final boolean usesOldCodec = randomBoolean();
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(usesOldCodec ? new OldSIMockingCodec() : actualDefaultCodec()));
@@ -319,7 +316,7 @@ public void testWriteLegacyChecksums() throws IOException {
public void testNewChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
// set default codec - all segments need checksums
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()));
int docs = 1 + random().nextInt(100);
@@ -379,7 +376,7 @@ public void testNewChecksums() throws IOException {
public void testMixedChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
// this time random codec....
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec()));
int docs = 1 + random().nextInt(100);
@@ -471,7 +468,7 @@ public void testMixedChecksums() throws IOException {
public void testRenameFile() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random(), false);
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
IndexOutput output = store.directory().createOutput("foo.bar", IOContext.DEFAULT);
int iters = scaledRandomIntBetween(10, 100);
@@ -505,27 +502,10 @@ public void testRenameFile() throws IOException {
- DistributorDirectory distributorDirectory = DirectoryUtils.getLeaf(store.directory(), DistributorDirectory.class);
- if (distributorDirectory != null && distributorDirectory.getDirectory("foo.bar") != distributorDirectory.getDirectory("bar.foo")) {
- try {
- store.renameFile("foo.bar", "bar.foo");
- fail("target file already exists in a different directory");
- } catch (IOException ex) {
- // expected
- }
- try (IndexInput input = store.directory().openInput("bar.foo", IOContext.DEFAULT)) {
- assertThat(lastChecksum, equalTo(CodecUtil.checksumEntireFile(input)));
- }
- assertThat(store.directory().listAll().length, is(2));
- assertDeleteContent(store, directoryService);
- IOUtils.close(store);
- } else {
- store.renameFile("foo.bar", "bar.foo");
- assertThat(store.directory().listAll().length, is(1));
- assertDeleteContent(store, directoryService);
- IOUtils.close(store);
- }
+ store.renameFile("foo.bar", "bar.foo");
+ assertThat(store.directory().listAll().length, is(1));
+ assertDeleteContent(store, directoryService);
+ IOUtils.close(store);
public void testCheckIntegrity() throws IOException {
@@ -684,13 +664,11 @@ public void assertDeleteContent(Store store, DirectoryService service) throws IO
assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));
assertThat(store.stats().sizeInBytes(), equalTo(0l));
- for (Directory dir : service.build()) {
- assertThat(dir.listAll().length, equalTo(0));
- }
+ assertThat(service.newDirectory().listAll().length, equalTo(0));
private static final class LuceneManagedDirectoryService extends DirectoryService {
- private final Directory[] dirs;
+ private final Directory dir;
private final Random random;
public LuceneManagedDirectoryService(Random random) {
@@ -698,20 +676,17 @@ public LuceneManagedDirectoryService(Random random) {
public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) {
super(new ShardId("fake", 1), ImmutableSettings.EMPTY);
- this.dirs = new Directory[1 + random.nextInt(5)];
- for (int i = 0; i < dirs.length; i++) {
- dirs[i] = newDirectory(random);
- if (dirs[i] instanceof MockDirectoryWrapper) {
- ((MockDirectoryWrapper)dirs[i]).setPreventDoubleWrite(preventDoubleWrite);
+ dir = StoreTest.newDirectory(random);
+ if (dir instanceof MockDirectoryWrapper) {
+ ((MockDirectoryWrapper)dir).setPreventDoubleWrite(preventDoubleWrite);
// TODO: fix this test to handle virus checker
- ((MockDirectoryWrapper)dirs[i]).setEnableVirusScanner(false);
+ ((MockDirectoryWrapper)dir).setEnableVirusScanner(false);
- }
this.random = random;
- public Directory[] build() throws IOException {
- return dirs;
+ public Directory newDirectory() throws IOException {
+ return dir;
@@ -729,13 +704,6 @@ public static void assertConsistent(Store store, Store.MetadataSnapshot metadata
- private Distributor randomDistributor(DirectoryService service) throws IOException {
- return randomDistributor(random(), service);
- }
- private Distributor randomDistributor(Random random, DirectoryService service) throws IOException {
- return random.nextBoolean() ? new LeastUsedDistributor(service) : new RandomWeightedDistributor(service);
- }
* Legacy indices without lucene CRC32 did never write or calculate checksums for segments_N files
@@ -775,7 +743,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
@@ -806,7 +774,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
- store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(random, directoryService), new DummyShardLock(shardId));
+ store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
@@ -907,7 +875,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
public void testCleanupFromSnapshot() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
// this time random codec....
IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(actualDefaultCodec());
// we keep all commits and that allows us clean based on multiple snapshots
@@ -1016,7 +984,7 @@ public void testCleanUpWithLegacyChecksums() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
for (String file : metaDataMap.keySet()) {
try (IndexOutput output = store.directory().createOutput(file, IOContext.DEFAULT)) {
BytesRef bytesRef = new BytesRef(TestUtil.randomRealisticUnicodeString(random(), 10, 1024));
@@ -1036,7 +1004,7 @@ public void testOnCloseCallback() throws IOException {
final AtomicInteger count = new AtomicInteger(0);
final ShardLock lock = new DummyShardLock(shardId);
- Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, randomDistributor(directoryService), lock , new Store.OnClose() {
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, lock , new Store.OnClose() {
public void handle(ShardLock theLock) {
assertEquals(shardId, theLock.getShardId());
@@ -1059,7 +1027,7 @@ public void testStoreStats() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Settings settings = ImmutableSettings.builder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, TimeValue.timeValueMinutes(0)).build();
- Store store = new Store(shardId, settings, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
+ Store store = new Store(shardId, settings, directoryService, new DummyShardLock(shardId));
StoreStats stats = store.stats();
assertEquals(stats.getSize().bytes(), 0);
diff --git a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java b/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java
deleted file mode 100644
index 119c763684321..0000000000000
--- a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.elasticsearch.index.store.distributor;
-import org.apache.lucene.store.*;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.store.DirectoryService;
-import org.elasticsearch.test.ElasticsearchTestCase;
-import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import static org.hamcrest.Matchers.*;
- */
-public class DistributorTests extends ElasticsearchTestCase {
- @Test
- public void testLeastUsedDistributor() throws Exception {
- FakeFsDirectory[] directories = new FakeFsDirectory[]{
- new FakeFsDirectory("dir0", 10L),
- new FakeFsDirectory("dir1", 20L),
- new FakeFsDirectory("dir2", 30L)
- };
- FakeDirectoryService directoryService = new FakeDirectoryService(directories);
- LeastUsedDistributor distributor = new LeastUsedDistributor(directoryService) {
- @Override
- protected long getUsableSpace(Directory directory) throws IOException {
- return ((FakeFsDirectory)directory).useableSpace;
- }
- };
- for (int i = 0; i < 5; i++) {
- assertThat(distributor.any(), equalTo((Directory) directories[2]));
- }
- directories[2].setUsableSpace(5L);
- for (int i = 0; i < 5; i++) {
- assertThat(distributor.any(), equalTo((Directory) directories[1]));
- }
- directories[1].setUsableSpace(0L);
- for (int i = 0; i < 5; i++) {
- assertThat(distributor.any(), equalTo((Directory) directories[0]));
- }
- directories[0].setUsableSpace(10L);
- directories[1].setUsableSpace(20L);
- directories[2].setUsableSpace(20L);
- for (FakeFsDirectory directory : directories) {
- directory.resetAllocationCount();
- }
- for (int i = 0; i < 10000; i++) {
- ((FakeFsDirectory) distributor.any()).incrementAllocationCount();
- }
- assertThat(directories[0].getAllocationCount(), equalTo(0));
- assertThat((double) directories[1].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5));
- // Test failover scenario
- for (FakeFsDirectory directory : directories) {
- directory.resetAllocationCount();
- }
- directories[0].setUsableSpace(0L);
- directories[1].setUsableSpace(0L);
- directories[2].setUsableSpace(0L);
- for (int i = 0; i < 10000; i++) {
- ((FakeFsDirectory) distributor.any()).incrementAllocationCount();
- }
- for (FakeFsDirectory directory : directories) {
- assertThat(directory.getAllocationCount(), greaterThan(0));
- }
- assertThat((double) directories[0].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5));
- assertThat((double) directories[1].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5));
- }
- @Test
- public void testRandomWeightedDistributor() throws Exception {
- FakeFsDirectory[] directories = new FakeFsDirectory[]{
- new FakeFsDirectory("dir0", 10L),
- new FakeFsDirectory("dir1", 20L),
- new FakeFsDirectory("dir2", 30L)
- };
- FakeDirectoryService directoryService = new FakeDirectoryService(directories);
- RandomWeightedDistributor randomWeightedDistributor = new RandomWeightedDistributor(directoryService) {
- @Override
- protected long getUsableSpace(Directory directory) throws IOException {
- return ((FakeFsDirectory)directory).useableSpace;
- }
- };
- for (int i = 0; i < 10000; i++) {
- ((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount();
- }
- for (FakeFsDirectory directory : directories) {
- assertThat(directory.getAllocationCount(), greaterThan(0));
- }
- assertThat((double) directories[1].getAllocationCount() / directories[0].getAllocationCount(), closeTo(2.0, 0.5));
- assertThat((double) directories[2].getAllocationCount() / directories[0].getAllocationCount(), closeTo(3.0, 0.5));
- for (FakeFsDirectory directory : directories) {
- directory.resetAllocationCount();
- }
- directories[1].setUsableSpace(0L);
- for (int i = 0; i < 1000; i++) {
- ((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount();
- }
- assertThat(directories[0].getAllocationCount(), greaterThan(0));
- assertThat(directories[1].getAllocationCount(), equalTo(0));
- assertThat(directories[2].getAllocationCount(), greaterThan(0));
- }
- public static class FakeDirectoryService extends DirectoryService {
- private final Directory[] directories;
- public FakeDirectoryService(Directory[] directories) {
- super(new ShardId("fake", 1), ImmutableSettings.EMPTY);
- this.directories = directories;
- }
- @Override
- public Directory[] build() throws IOException {
- return directories;
- }
- @Override
- public long throttleTimeInNanos() {
- return 0;
- }
- }
- public static class FakeFsDirectory extends FSDirectory {
- public int allocationCount;
- public long useableSpace;
- public FakeFsDirectory(String path, long usableSpace) throws IOException {
- super(Paths.get(path), NoLockFactory.INSTANCE);
- allocationCount = 0;
- this.useableSpace = usableSpace;
- }
- @Override
- public IndexInput openInput(String name, IOContext context) throws IOException {
- throw new UnsupportedOperationException("Shouldn't be called in the test");
- }
- public void setUsableSpace(long usableSpace) {
- this.useableSpace = usableSpace;
- }
- public void incrementAllocationCount() {
- allocationCount++;
- }
- public int getAllocationCount() {
- return allocationCount;
- }
- public void resetAllocationCount() {
- allocationCount = 0;
- }
- }
diff --git a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
index 20af2562fed09..10f21e0d3d4f4 100644
--- a/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
+++ b/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
@@ -68,7 +68,7 @@ public void setUp() throws Exception {
translog = create();
// if a previous test failed we clean up things here
- FileSystemUtils.deleteSubDirectories(translog.locations());
+ FileSystemUtils.deleteSubDirectories(translog.location());
@@ -76,14 +76,14 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
try {
- final Path[] locations = translog.locations();
+ final Path location = translog.location();
if (translog.currentId() > 1) {
// ensure all snapshots etc are closed if this fails something was not closed
assertFileDeleted(translog, translog.currentId() - 1);
assertFileIsPresent(translog, translog.currentId());
- IOUtils.rm(locations); // delete all the locations
+ IOUtils.rm(location); // delete all the locations
} finally {
@@ -387,18 +387,14 @@ public void deleteOnRollover() throws IOException {
public void assertFileIsPresent(Translog translog, long id) {
- for (Path location : translog.locations()) {
- if (Files.exists(location.resolve(translog.getPath(id)))) {
- return;
- }
+ if(Files.exists(translog.location().resolve(translog.getPath(id)))) {
+ return;
- fail(translog.getPath(id) + " is not present in any location: " + Arrays.toString(translog.locations()));
+ fail(translog.getPath(id) + " is not present in any location: " + translog.location());
public void assertFileDeleted(Translog translog, long id) {
- for (Path location : translog.locations()) {
- assertFalse(Files.exists(location.resolve(translog.getPath(id))));
- }
+ assertFalse(Files.exists(translog.location().resolve(translog.getPath(id))));
diff --git a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java
index a24352ff0ea41..0488c02041ea5 100644
--- a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java
+++ b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java
@@ -29,11 +29,10 @@
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -44,6 +43,9 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
public IndicesService getIndicesService() {
return getInstanceFromNode(IndicesService.class);
+ public NodeEnvironment getNodeEnvironment() {
+ return getInstanceFromNode(NodeEnvironment.class);
+ }
protected boolean resetNodeAfterTest() {
@@ -87,17 +89,14 @@ public void testDeleteIndexStore() throws Exception {
- createIndex("test");
+ test = createIndex("test");
client().prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).get();
assertHitCount(client().prepareSearch("test").get(), 1);
IndexMetaData secondMetaData = clusterService.state().metaData().index("test");
- NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class);
- Path[] paths = nodeEnv.shardDataPaths(new ShardId("test", 0), clusterService.state().getMetaData().index("test").getSettings());
- for (Path path : paths) {
- assertTrue(Files.exists(path));
- }
+ ShardPath path = ShardPath.loadShardPath(logger, getNodeEnvironment(), new ShardId(test.index(), 0), test.getIndexSettings());
+ assertTrue(path.exists());
try {
indicesService.deleteIndexStore("boom", secondMetaData, clusterService.state());
@@ -106,9 +105,7 @@ public void testDeleteIndexStore() throws Exception {
// all good
- for (Path path : paths) {
- assertTrue(Files.exists(path));
- }
+ assertTrue(path.exists());
// now delete the old one and make sure we resolve against the name
try {
@@ -124,19 +121,20 @@ public void testDeleteIndexStore() throws Exception {
public void testPendingTasks() throws IOException {
IndicesService indicesService = getIndicesService();
IndexService test = createIndex("test");
- NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class);
- Path[] paths = nodeEnv.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
+ ShardPath path = test.shard(0).shardPath();
+ assertTrue(test.shard(0).routingEntry().started());
+ ShardPath shardPath = ShardPath.loadShardPath(logger, getNodeEnvironment(), new ShardId(test.index(), 0), test.getIndexSettings());
+ assertEquals(shardPath, path);
try {
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
} catch (LockObtainFailedException ex) {
- for (Path p : paths) {
- assertTrue(Files.exists(p));
- }
+ assertTrue(path.exists());
int numPending = 1;
if (randomBoolean()) {
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
@@ -148,16 +146,14 @@ public void testPendingTasks() throws IOException {
indicesService.addPendingDelete(test.index(), test.getIndexSettings());
- for (Path p : paths) {
- assertTrue(Files.exists(p));
- }
+ assertTrue(path.exists());
assertEquals(indicesService.numPendingDeletes(test.index()), numPending);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
- for (Path p : paths) {
- assertFalse(Files.exists(p));
- }
+ assertFalse(path.exists());
if (randomBoolean()) {
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java
index 978f85f4a0a48..ea76cf4a33e1f 100644
--- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java
@@ -30,6 +30,7 @@
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryService;
@@ -42,13 +43,12 @@
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.junit.Test;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@@ -59,6 +59,11 @@
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) { // simplify this and only use a single data path
+ return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("path.data", "").build();
+ }
public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false));
@@ -248,12 +253,16 @@ public void onFailure(String source, Throwable t) {
private Path indexDirectory(String server, String index) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
- return env.indexPaths(new Index(index))[0];
+ final Path[] paths = env.indexPaths(new Index(index));
+ assert paths.length == 1;
+ return paths[0];
private Path shardDirectory(String server, String index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
- return env.shardPaths(new ShardId(index, shard))[0];
+ final Path[] paths = env.shardPaths(new ShardId(index, shard));
+ assert paths.length == 1;
+ return paths[0];
private boolean waitForShardDeletion(final String server, final String index, final int shard) throws InterruptedException {
diff --git a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java b/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java
deleted file mode 100644
index bfe2876c8dc75..0000000000000
--- a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java
+++ /dev/null
@@ -1,160 +0,0 @@
-package org.elasticsearch.indices.store;
-import org.apache.lucene.store.Directory;
-import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.store.IndexStoreModule;
-import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.test.ElasticsearchIntegrationTest;
-import org.junit.Test;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Locale;
-import java.util.Set;
-import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
-import static org.hamcrest.Matchers.*;
- *
- */
-public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
- @Test
- public void testAvailableSpaceDetection() {
- for (IndexStoreModule.Type store : IndexStoreModule.Type.values()) {
- createIndexWithStoreType("test", store, StrictDistributor.class.getCanonicalName());
- }
- }
- @Test
- public void testDirectoryToString() throws IOException {
- internalCluster().wipeTemplates(); // no random settings please
- createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "least_used");
- String storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- Path[] dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
- createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "random");
- storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
- createIndexWithStoreType("test", IndexStoreModule.Type.MMAPFS, "least_used");
- storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
- createIndexWithStoreType("test", IndexStoreModule.Type.SIMPLEFS, "least_used");
- storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
- createIndexWithStoreType("test", IndexStoreModule.Type.DEFAULT, "least_used");
- storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("),niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
- createIndexWithoutRateLimitingStoreType("test", IndexStoreModule.Type.NIOFS, "least_used");
- storeString = getStoreDirectory("test", 0).toString();
- logger.info(storeString);
- dataPaths = dataPaths();
- assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- if (dataPaths.length > 1) {
- assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
- }
- assertThat(storeString, endsWith(")])"));
- }
- private void createIndexWithStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
- cluster().wipeIndices(index);
- client().admin().indices().prepareCreate(index)
- .setSettings(settingsBuilder()
- .put("index.store.distributor", distributor)
- .put("index.store.type", storeType.name())
- .put("index.number_of_replicas", 0)
- .put("index.number_of_shards", 1)
- .put("index.store.throttle.type", "merge")
- .put("index.store.throttle.max_bytes_per_sec", "20mb")
- )
- .execute().actionGet();
- assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
- }
- private void createIndexWithoutRateLimitingStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
- cluster().wipeIndices(index);
- client().admin().indices().prepareCreate(index)
- .setSettings(settingsBuilder()
- .put("index.store.distributor", distributor)
- .put("index.store.type", storeType)
- .put("index.store.throttle.type", "none")
- .put("index.number_of_replicas", 0)
- .put("index.number_of_shards", 1)
- )
- .execute().actionGet();
- assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
- }
- private Path[] dataPaths() {
- Set nodes = internalCluster().nodesInclude("test");
- assertThat(nodes.isEmpty(), equalTo(false));
- NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, nodes.iterator().next());
- return env.nodeDataPaths();
- }
- private Directory getStoreDirectory(String index, int shardId) {
- Set nodes = internalCluster().nodesInclude("test");
- assertThat(nodes.isEmpty(), equalTo(false));
- IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes.iterator().next());
- IndexShard indexShard = indicesService.indexService(index).shardSafe(shardId);
- return indexShard.store().directory();
- }
diff --git a/src/test/java/org/elasticsearch/indices/store/StrictDistributor.java b/src/test/java/org/elasticsearch/indices/store/StrictDistributor.java
deleted file mode 100644
index 1229ef27475c3..0000000000000
--- a/src/test/java/org/elasticsearch/indices/store/StrictDistributor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.elasticsearch.indices.store;
-import org.apache.lucene.store.Directory;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.index.store.DirectoryService;
-import org.elasticsearch.index.store.distributor.AbstractDistributor;
-import java.io.IOException;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
- *
- */
-public class StrictDistributor extends AbstractDistributor {
- @Inject
- public StrictDistributor(DirectoryService directoryService) throws IOException {
- super(directoryService);
- }
- @Override
- public Directory doAny() throws IOException {
- for (Directory delegate : delegates) {
- assertThat(getUsableSpace(delegate), greaterThan(0L));
- }
- return primary();
- }
- @Override
- public String name() {
- return "strict";
- }
\ No newline at end of file
diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
index 299c61108b77b..9cc9baaadc072 100644
--- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java
+++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
@@ -32,7 +32,6 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@@ -596,22 +595,24 @@ public void run() {
for (String node : internalCluster().getNodeNames()) {
NodeEnvironment nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, node);
for (final Path shardLoc : nodeEnvironment.shardPaths(new ShardId(indexName, 0))) {
- assertBusy(new Runnable() {
- @Override
- public void run() {
- try {
- Files.walkFileTree(shardLoc, new SimpleFileVisitor() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), not(startsWith("recovery.")));
- return FileVisitResult.CONTINUE;
- }
- });
- } catch (IOException e) {
- throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
+ if (Files.exists(shardLoc)) {
+ assertBusy(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Files.walkFileTree(shardLoc, new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ assertThat("found a temporary recovery file: " + file, file.getFileName().toString(), not(startsWith("recovery.")));
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ throw new AssertionError("failed to walk file tree starting at [" + shardLoc + "]", e);
+ }
- }
- });
+ });
+ }
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
index 28f92be21ef70..f3e0773ecce88 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
@@ -1918,7 +1918,7 @@ protected String routingKeyForShard(String index, String type, int shard) {
* Return settings that could be used to start a node that has the given zipped home directory.
- protected Settings prepareBackwardsDataDir(Path backwardsIndex, Object... settings) throws IOException {
+ public Settings prepareBackwardsDataDir(Path backwardsIndex, Object... settings) throws IOException {
Path indexDir = newTempDirPath();
Path dataDir = indexDir.resolve("data");
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
index db3e021f94519..0f132d2416342 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java
@@ -572,7 +572,7 @@ public static Path newTempDirPath(LifecycleScope scope) {
* Returns a random number of temporary paths.
public String[] tmpPaths() {
- final int numPaths = randomIntBetween(1, 3);
+ final int numPaths = randomIntBetween(1, 5);
final String[] absPaths = new String[numPaths];
for (int i = 0; i < numPaths; i++) {
absPaths[i] = newTempDirPath().toAbsolutePath().toString();
diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
index 701b7fa8479e1..51fda6659c861 100644
--- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
@@ -256,9 +256,10 @@ public InternalTestCluster(long clusterSeed,
logger.info("Setup InternalTestCluster [{}] with seed [{}] using [{}] data nodes and [{}] client nodes", clusterName, SeedUtils.formatSeed(clusterSeed), numSharedDataNodes, numSharedClientNodes);
this.settingsSource = settingsSource;
Builder builder = ImmutableSettings.settingsBuilder();
- if (random.nextInt(5) == 0) { // sometimes set this
+ // nocommit - currently we force multi data dir
+ if (random.nextInt(5) == 0 || true) { // sometimes set this
// randomize (multi/single) data path, special case for 0, don't set it at all...
- final int numOfDataPaths = random.nextInt(5);
+ final int numOfDataPaths = 1 + random.nextInt(5);
if (numOfDataPaths > 0) {
StringBuilder dataPath = new StringBuilder();
for (int i = 0; i < numOfDataPaths; i++) {
diff --git a/src/test/java/org/elasticsearch/test/store/MockDirectoryHelper.java b/src/test/java/org/elasticsearch/test/store/MockDirectoryHelper.java
index a2b323225077b..7f8d42ca3445f 100644
--- a/src/test/java/org/elasticsearch/test/store/MockDirectoryHelper.java
+++ b/src/test/java/org/elasticsearch/test/store/MockDirectoryHelper.java
@@ -29,6 +29,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.fs.*;
@@ -91,28 +92,21 @@ public Directory wrap(Directory dir) {
return w;
- public Directory[] wrapAllInplace(Directory[] dirs) {
- for (int i = 0; i < dirs.length; i++) {
- dirs[i] = wrap(dirs[i]);
- }
- return dirs;
- }
- public FsDirectoryService randomDirectorService(IndexStore indexStore) {
+ public FsDirectoryService randomDirectorService(IndexStore indexStore, ShardPath path) {
if ((Constants.WINDOWS || Constants.SUN_OS) && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
- return new MmapFsDirectoryService(shardId, indexSettings, indexStore);
+ return new MmapFsDirectoryService(shardId, indexSettings, indexStore, path);
} else if (Constants.WINDOWS) {
- return new SimpleFsDirectoryService(shardId, indexSettings, indexStore);
+ return new SimpleFsDirectoryService(shardId, indexSettings, indexStore, path);
switch (random.nextInt(4)) {
case 2:
- return new DefaultFsDirectoryService(shardId, indexSettings, indexStore);
+ return new DefaultFsDirectoryService(shardId, indexSettings, indexStore, path);
case 1:
- return new MmapFsDirectoryService(shardId, indexSettings, indexStore);
+ return new MmapFsDirectoryService(shardId, indexSettings, indexStore, path);
case 0:
- return new SimpleFsDirectoryService(shardId, indexSettings, indexStore);
+ return new SimpleFsDirectoryService(shardId, indexSettings, indexStore, path);
- return new NioFsDirectoryService(shardId, indexSettings, indexStore);
+ return new NioFsDirectoryService(shardId, indexSettings, indexStore, path);
diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java
index 618e23bc569cf..9ea258ceafe43 100644
--- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java
+++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java
@@ -26,25 +26,17 @@
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
-import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.engine.Engine;
-import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.IndexShardException;
-import org.elasticsearch.index.shard.IndexShardState;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.fs.FsDirectoryService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
@@ -69,14 +61,14 @@ public class MockFSDirectoryService extends FsDirectoryService {
private final boolean checkIndexOnClose;
- public MockFSDirectoryService(final ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, final IndicesService service) {
- super(shardId, indexSettings, indexStore);
+ public MockFSDirectoryService(final ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, final IndicesService service, final ShardPath path) {
+ super(shardId, indexSettings, indexStore, path);
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
Random random = new Random(seed);
helper = new MockDirectoryHelper(shardId, indexSettings, logger, random, seed);
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true);
- delegateService = helper.randomDirectorService(indexStore);
+ delegateService = helper.randomDirectorService(indexStore, path);
if (checkIndexOnClose) {
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
@@ -112,9 +104,11 @@ public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
- public Directory[] build() throws IOException {
- return delegateService.build();
+ public Directory newDirectory() throws IOException {
+ return helper.wrap(delegateService.newDirectory());
@@ -175,9 +169,4 @@ public StoreRateLimiting rateLimiting() {
public long throttleTimeInNanos() {
return delegateService.throttleTimeInNanos();
- @Override
- public Directory newFromDistributor(Distributor distributor) throws IOException {
- return helper.wrap(super.newFromDistributor(distributor));
- }