Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add search node repurpose commands #6517

Merged
merged 1 commit into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,9 @@ static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IO
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
shardStream.filter(NodeEnvironment::isShardPath)
.map(Path::toAbsolutePath)
.forEach(indexSubPaths::add);
}
}
}
Expand Down
195 changes: 153 additions & 42 deletions server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -57,6 +58,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.env.NodeEnvironment.CACHE_FOLDER;
import static org.opensearch.env.NodeEnvironment.INDICES_FOLDER;

/**
Expand All @@ -68,12 +70,14 @@ public class NodeRepurposeCommand extends OpenSearchNodeCommand {

static final String ABORTED_BY_USER_MSG = OpenSearchNodeCommand.ABORTED_BY_USER_MSG;
static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = OpenSearchNodeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG;
static final String NO_CLEANUP = "Node has node.data=true -> no clean up necessary";
static final String NO_CLEANUP = "Node has node.data=true and node.search=true -> no clean up necessary";
static final String NO_DATA_TO_CLEAN_UP_FOUND = "No data to clean-up found";
static final String NO_SHARD_DATA_TO_CLEAN_UP_FOUND = "No shard data to clean-up found";
static final String NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND = "No file cache to clean-up found";
private static final int FILE_CACHE_NODE_PATH_LOCATION = 0;

public NodeRepurposeCommand() {
super("Repurpose this node to another cluster-manager/data role, cleaning up any excess persisted data");
super("Repurpose this node to another cluster-manager/data/search role, cleaning up any excess persisted data");
}

void testExecute(Terminal terminal, OptionSet options, Environment env) throws Exception {
Expand All @@ -83,7 +87,7 @@ void testExecute(Terminal terminal, OptionSet options, Environment env) throws E
@Override
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
if (DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.isDataNode(settings) && DiscoveryNode.isSearchNode(settings)) {
terminal.println(Terminal.Verbosity.NORMAL, NO_CLEANUP);
return false;
}
Expand All @@ -94,85 +98,186 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
assert DiscoveryNode.isDataNode(env.settings()) == false;
assert DiscoveryNode.isDataNode(env.settings()) == false || DiscoveryNode.isSearchNode(env.settings()) == false;

boolean repurposeData = DiscoveryNode.isDataNode(env.settings()) == false;
boolean repurposeSearch = DiscoveryNode.isSearchNode(env.settings()) == false;

if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
processNoClusterManagerNoDataNode(terminal, dataPaths, env);
processNoClusterManagerRepurposeNode(terminal, dataPaths, env, repurposeData, repurposeSearch);
} else {
processClusterManagerNoDataNode(terminal, dataPaths, env);
processClusterManagerRepurposeNode(terminal, dataPaths, env, repurposeData, repurposeSearch);
}
}

private void processNoClusterManagerNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
private void processNoClusterManagerRepurposeNode(
Terminal terminal,
Path[] dataPaths,
Environment env,
boolean repurposeData,
boolean repurposeSearch
) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
Set<Path> indexPaths = Set.of();
List<Path> shardDataPaths = List.of();
Set<Path> fileCachePaths = Set.of();
List<Path> fileCacheDataPaths = List.of();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths");
List<Path> indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(nodePaths);

Set<Path> indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths);
if (repurposeData) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
indexPaths = uniqueParentPaths(shardDataPaths, indexMetadataPaths);
}

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths);
}

final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();
if (indexPaths.isEmpty() && metadata.indices().isEmpty()) {
if (repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty() && indexPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (repurposeData && !repurposeSearch && indexPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(Terminal.Verbosity.NORMAL, NO_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (!repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

final Set<String> indexUUIDs = Sets.union(
indexUUIDsFor(indexPaths),
StreamSupport.stream(metadata.indices().values().spliterator(), false)
.map(imd -> imd.value.getIndexUUID())
.collect(Collectors.toSet())
indexUUIDsFor(fileCachePaths),
Sets.union(
indexUUIDsFor(indexPaths),
StreamSupport.stream(metadata.indices().values().spliterator(), false)
.map(imd -> imd.value.getIndexUUID())
.collect(Collectors.toSet())
)
);

outputVerboseInformation(terminal, indexPaths, indexUUIDs, metadata);

terminal.println(noClusterManagerMessage(indexUUIDs.size(), shardDataPaths.size(), indexMetadataPaths.size()));
List<Path> cleanUpPaths = new ArrayList<>(shardDataPaths);
cleanUpPaths.addAll(fileCacheDataPaths);
outputVerboseInformation(terminal, cleanUpPaths, indexUUIDs, metadata);
terminal.println(noClusterManagerMessage(indexUUIDs.size(), cleanUpPaths.size(), indexMetadataPaths.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println("Node is being re-purposed as no-cluster-manager and no-data. Clean-up of index data will be performed.");
if (repurposeData && repurposeSearch) {
terminal.println(
"Node is being re-purposed as no-cluster-manager, no-data and no-search. Clean-up of index data and file cache will be performed."
);
} else if (repurposeData) {
terminal.println("Node is being re-purposed as no-cluster-manager and no-data. Clean-up of index data will be performed.");
} else if (repurposeSearch) {
terminal.println(
"Node is being re-purposed as no-cluster-manager and no-search. Clean-up of file cache and corresponding index metadata will be performed."
);
}
confirm(terminal, "Do you want to proceed?");

removePaths(terminal, indexPaths); // clean-up shard dirs
// clean-up all metadata dirs
MetadataStateFormat.deleteMetaState(dataPaths);
IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(INDICES_FOLDER)).toArray(Path[]::new));
if (repurposeData) {
removePaths(terminal, indexPaths); // clean-up shard dirs
IOUtils.rm(Stream.of(dataPaths).map(path -> path.resolve(INDICES_FOLDER)).toArray(Path[]::new));
}

if (repurposeSearch) {
removePaths(terminal, fileCachePaths); // clean-up file cache dirs
IOUtils.rm(dataPaths[FILE_CACHE_NODE_PATH_LOCATION].resolve(CACHE_FOLDER));
}

terminal.println("Node successfully repurposed to no-cluster-manager and no-data.");
if (repurposeData && repurposeSearch) {
terminal.println("Node successfully repurposed to no-cluster-manager, no-data and no-search.");
} else if (repurposeData) {
terminal.println("Node successfully repurposed to no-cluster-manager and no-data.");
} else if (repurposeSearch) {
terminal.println("Node successfully repurposed to no-cluster-manager and no-search.");
}
Comment on lines +197 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you could avoid this explosion of cases with something like:

roleText.add("no-cluster-manager");
if (repurposeData) roleText.add("no-data");
if (repurposeSearch) roleText.add("no-search");
terminal.println("Node successfully repurposed to: " + String.join(",", roleText));

Same applies in the other cases though might require more refactoring. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of doing that, but this makes it more readable and traceable. Given its use case and frequency of use, I think we would rather benefit from this approach.

}

private void processClusterManagerNoDataNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
private void processClusterManagerRepurposeNode(
Terminal terminal,
Path[] dataPaths,
Environment env,
boolean repurposeData,
boolean repurposeSearch
) throws IOException {
NodeEnvironment.NodePath[] nodePaths = toNodePaths(dataPaths);
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];
final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
List<Path> shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
if (shardDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
}
Set<Path> indexPaths = Set.of();
List<Path> shardDataPaths = List.of();
Set<Path> fileCachePaths = Set.of();
List<Path> fileCacheDataPaths = List.of();

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
if (repurposeData) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting shard data paths");
shardDataPaths = NodeEnvironment.collectShardDataPaths(nodePaths);
indexPaths = uniqueParentPaths(shardDataPaths);
}

final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();
if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCachePaths = uniqueParentPaths(fileCacheDataPaths);
}

final Set<Path> indexPaths = uniqueParentPaths(shardDataPaths);
final Set<String> indexUUIDs = indexUUIDsFor(indexPaths);
if (repurposeData && repurposeSearch && shardDataPaths.isEmpty() && fileCacheDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (repurposeData && !repurposeSearch && shardDataPaths.isEmpty()) {
terminal.println(NO_SHARD_DATA_TO_CLEAN_UP_FOUND);
return;
} else if (!repurposeData && repurposeSearch && fileCacheDataPaths.isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

outputVerboseInformation(terminal, shardDataPaths, indexUUIDs, metadata);
final Set<String> indexUUIDs = Sets.union(indexUUIDsFor(indexPaths), indexUUIDsFor(fileCachePaths));

terminal.println(shardMessage(shardDataPaths.size(), indexUUIDs.size()));
List<Path> cleanUpPaths = new ArrayList<>(shardDataPaths);
cleanUpPaths.addAll(fileCacheDataPaths);
outputVerboseInformation(terminal, cleanUpPaths, indexUUIDs, metadata);
terminal.println(shardMessage(cleanUpPaths.size(), indexUUIDs.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println("Node is being re-purposed as cluster-manager and no-data. Clean-up of shard data will be performed.");
if (repurposeData && repurposeSearch) {
terminal.println(
"Node is being re-purposed as cluster-manager, no-data and no-search. Clean-up of shard data and file cache data will be performed."
);
} else if (repurposeData) {
terminal.println("Node is being re-purposed as cluster-manager and no-data. Clean-up of shard data will be performed.");
} else if (repurposeSearch) {
terminal.println("Node is being re-purposed as cluster-manager and no-search. Clean-up of file cache data will be performed.");
}

confirm(terminal, "Do you want to proceed?");

removePaths(terminal, shardDataPaths); // clean-up shard dirs
if (repurposeData) {
removePaths(terminal, shardDataPaths); // clean-up shard dirs
}

if (repurposeSearch) {
removePaths(terminal, fileCacheDataPaths); // clean-up file cache dirs
}

terminal.println("Node successfully repurposed to cluster-manager and no-data.");
if (repurposeData && repurposeSearch) {
terminal.println("Node successfully repurposed to cluster-manager, no-data and no-search.");
} else if (repurposeData) {
terminal.println("Node successfully repurposed to cluster-manager and no-data.");
} else if (repurposeSearch) {
terminal.println("Node successfully repurposed to cluster-manager and no-search.");
}
}

private ClusterState loadClusterState(Terminal terminal, Environment env, PersistedClusterStateService psf) throws IOException {
Expand Down Expand Up @@ -211,11 +316,17 @@ private Set<String> indexUUIDsFor(Set<Path> indexPaths) {
}

static String noClusterManagerMessage(int indexes, int shards, int indexMetadata) {
return "Found " + indexes + " indices (" + shards + " shards and " + indexMetadata + " index meta data) to clean up";
return "Found "
+ indexes
+ " indices ("
+ shards
+ " shards/file cache folders and "
+ indexMetadata
+ " index meta data) to clean up";
}

static String shardMessage(int shards, int indices) {
return "Found " + shards + " shards in " + indices + " indices to clean up";
return "Found " + shards + " shards/file cache folders in " + indices + " indices to clean up";
}

private void removePaths(Terminal terminal, Collection<Path> paths) {
Expand Down
Loading