Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make nodePaths() singular #72514

Merged
merged 4 commits into from
May 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;

public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class);
Expand Down Expand Up @@ -131,12 +130,11 @@ public static Tuple<Long, ClusterState> loadTermAndClusterState(PersistedCluster
protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException, UserException {
terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node");
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) {
final Path[] dataPaths =
Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
if (dataPaths.length == 0) {
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
if (dataPath == null) {
throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG);
}
processNodePaths(terminal, dataPaths, options, env);
processNodePaths(terminal, new Path[] { dataPath.path }, options, env);
} catch (LockObtainFailedException e) {
throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e);
}
Expand Down
108 changes: 53 additions & 55 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public NodeLock(final Logger logger,
}
}

public NodePath[] getNodePaths() {
return nodePaths;
public NodePath getNodePath() {
return nodePaths[0];
}

@Override
Expand Down Expand Up @@ -352,67 +352,65 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings
}

// move contents from legacy path to new path
assert nodeLock.getNodePaths().length == legacyNodeLock.getNodePaths().length;
try {
final List<CheckedRunnable<IOException>> upgradeActions = new ArrayList<>();
for (int i = 0; i < legacyNodeLock.getNodePaths().length; i++) {
final NodePath legacyNodePath = legacyNodeLock.getNodePaths()[i];
final NodePath nodePath = nodeLock.getNodePaths()[i];

// determine folders to move and check that there are no extra files/folders
final Set<String> folderNames = new HashSet<>();
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(

// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
MetadataStateFormat.STATE_DIR_NAME,

// indices
INDICES_FOLDER,

// searchable snapshot cache Lucene index
SNAPSHOT_CACHE_FOLDER
));

try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
for (Path subFolderPath : stream) {
final String fileName = subFolderPath.getFileName().toString();
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
// ignore
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
if (expectedFolderNames.contains(fileName) == false) {
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
subFolderPath);
}
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
if (Files.exists(targetSubFolderPath)) {
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
targetSubFolderPath);
}
folderNames.add(fileName);
} else if (fileName.equals(NODE_LOCK_FILENAME) == false &&
fileName.equals(TEMP_FILE_NAME) == false) {
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
final NodePath legacyNodePath = legacyNodeLock.getNodePath();
final NodePath nodePath = nodeLock.getNodePath();

// determine folders to move and check that there are no extra files/folders
final Set<String> folderNames = new HashSet<>();
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(

// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
MetadataStateFormat.STATE_DIR_NAME,

// indices
INDICES_FOLDER,

// searchable snapshot cache Lucene index
SNAPSHOT_CACHE_FOLDER
));

try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
for (Path subFolderPath : stream) {
final String fileName = subFolderPath.getFileName().toString();
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
// ignore
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
if (expectedFolderNames.contains(fileName) == false) {
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
subFolderPath);
}
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
if (Files.exists(targetSubFolderPath)) {
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
targetSubFolderPath);
}
folderNames.add(fileName);
} else if (fileName.equals(NODE_LOCK_FILENAME) == false &&
fileName.equals(TEMP_FILE_NAME) == false) {
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
subFolderPath);
}
}
}

assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
"expected indices and/or state dir folder but was " + folderNames;
assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
"expected indices and/or state dir folder but was " + folderNames;

upgradeActions.add(() -> {
for (String folderName : folderNames) {
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
}
IOUtils.fsync(nodePath.path, true);
});

upgradeActions.add(() -> {
for (String folderName : folderNames) {
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
}
IOUtils.fsync(nodePath.path, true);
});
}
// now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an
// intermediate state would be pretty disastrous
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePaths());
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePath());
for (CheckedRunnable<IOException> upgradeAction : upgradeActions) {
upgradeAction.run();
}
Expand Down Expand Up @@ -920,12 +918,12 @@ public String nodeId() {
/**
* Returns an array of all of the {@link NodePath}s.
*/
public NodePath[] nodePaths() {
public NodePath nodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return nodePaths;
return nodePaths[0];
}

/**
Expand Down
76 changes: 5 additions & 71 deletions server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexSettings;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
Expand Down Expand Up @@ -192,85 +189,22 @@ public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shard

if (indexSettings.hasCustomDataPath()) {
dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId);
statePath = env.nodePaths()[0].resolve(shardId);
statePath = env.nodePath().resolve(shardId);
} else {
BigInteger totFreeSpace = BigInteger.ZERO;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the whole else block can be replaced by this one line:

            dataPath = statePath = env.nodePath().resolve(shardId);

The comments etc. are not important, nor is the checking of the free space.

for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodePath.fileStore.getUsableSpace()));
}
NodeEnvironment.NodePath nodePath = env.nodePath();
totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodePath.fileStore.getUsableSpace()));

// TODO: this is a hack!! We should instead keep track of incoming (relocated) shards since we know
// how large they will be once they're done copying, instead of a silly guess for such cases:

// Very rough heuristic of how much disk space we expect the shard will use over its lifetime, the max of current average
// shard size across the cluster and 5% of the total available free space on this node:
BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20)));

// TODO - do we need something more extensible? Yet, this does the job for now...
final NodeEnvironment.NodePath[] paths = env.nodePaths();

// If no better path is chosen, use the one with the most space by default
NodeEnvironment.NodePath bestPath = getPathWithMostFreeSpace(env);

if (paths.length != 1) {
Map<NodeEnvironment.NodePath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());

// Compute how much space there is on each path
final Map<NodeEnvironment.NodePath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
for (NodeEnvironment.NodePath nodePath : paths) {
FileStore fileStore = nodePath.fileStore;
BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
pathsToSpace.put(nodePath, usableBytes);
}

bestPath = Arrays.stream(paths)
// Filter out paths that have enough space
.filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
// Sort by the number of shards for this index
.sorted((p1, p2) -> {
int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L),
pathToShardCount.getOrDefault(p2, 0L));
if (cmp == 0) {
// if the number of shards is equal, tie-break with the number of total shards
cmp = Integer.compare(dataPathToShardCount.getOrDefault(p1.path, 0),
dataPathToShardCount.getOrDefault(p2.path, 0));
if (cmp == 0) {
// if the number of shards is equal, tie-break with the usable bytes
cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
}
}
return cmp;
})
// Return the first result
.findFirst()
// Or the existing best path if there aren't any that fit the criteria
.orElse(bestPath);
}

statePath = bestPath.resolve(shardId);
statePath = nodePath.resolve(shardId);
dataPath = statePath;
}
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
}

static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) throws IOException {
final NodeEnvironment.NodePath[] paths = env.nodePaths();
NodeEnvironment.NodePath bestPath = null;
long maxUsableBytes = Long.MIN_VALUE;
for (NodeEnvironment.NodePath nodePath : paths) {
FileStore fileStore = nodePath.fileStore;
long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries)
assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes;

if (bestPath == null || usableBytes > maxUsableBytes) {
// This path has been determined to be "better" based on the usable bytes
maxUsableBytes = usableBytes;
bestPath = nodePath;
}
}
return bestPath;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
15 changes: 5 additions & 10 deletions server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,17 @@ public FsInfo stats(FsInfo previous) throws IOException {
if (nodeEnv.hasNodeFile() == false) {
return new FsInfo(System.currentTimeMillis(), null, new FsInfo.Path[0]);
}
NodePath[] dataLocations = nodeEnv.nodePaths();
FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length];
for (int i = 0; i < dataLocations.length; i++) {
paths[i] = getFSInfo(dataLocations[i]);
}
NodePath dataLocation = nodeEnv.nodePath();
FsInfo.Path pathInfo = getFSInfo(dataLocation);
FsInfo.IoStats ioStats = null;
if (Constants.LINUX) {
Set<Tuple<Integer, Integer>> devicesNumbers = new HashSet<>();
for (int i = 0; i < dataLocations.length; i++) {
if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) {
devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber));
}
if (dataLocation.majorDeviceNumber != -1 && dataLocation.minorDeviceNumber != -1) {
devicesNumbers.add(Tuple.tuple(dataLocation.majorDeviceNumber, dataLocation.minorDeviceNumber));
}
ioStats = ioStats(devicesNumbers, previous);
}
return new FsInfo(System.currentTimeMillis(), ioStats, paths);
return new FsInfo(System.currentTimeMillis(), ioStats, new FsInfo.Path[] { pathInfo });
}

final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers, final FsInfo previous) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testDeleteSafe() throws Exception {
SetOnce<Path> listener = new SetOnce<>();
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set);
Path deletedPath = listener.get();
assertThat(deletedPath, equalTo(env.nodePaths()[0].resolve(index).resolve("1")));
assertThat(deletedPath, equalTo(env.nodePath().resolve(index).resolve("1")));
}

path = env.indexPath(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -133,8 +131,8 @@ public void setup() throws IOException {
clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, false).build()).build();

try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) {
final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId,
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { dataPath.path }, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
writer.writeFullStateAndCommit(1L, clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TransportGetAutoscalingCapacityActionIT extends AutoscalingIntegTes
public void testCurrentCapacity() throws Exception {
assertThat(capacity().results().keySet(), Matchers.empty());
long memory = OsProbe.getInstance().getTotalPhysicalMemorySize();
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePaths()[0].fileStore.getTotalSpace();
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePath().fileStore.getTotalSpace();
assertThat(memory, greaterThan(0L));
assertThat(storage, greaterThan(0L));
putAutoscalingPolicy("test");
Expand Down
Loading