Skip to content

Commit

Permalink
fix file cache initialization
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr committed Jun 5, 2024
1 parent 39ae32a commit 328da32
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

public static boolean isDedicatedSearchNode(Settings settings) {
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
private final Weigher<V> weigher;

public SegmentedCache(Builder<K, V> builder) {
this.capacity = builder.capacity;
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
this.segmentMask = segments - 1;
this.table = newSegmentArray(segments);
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
this.weigher = builder.weigher;
for (int i = 0; i < table.length; i++) {
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
}
this.capacity = perSegmentCapacity * segments;
}

@SuppressWarnings("unchecked")
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ public FsInfo stats(FsInfo previous) throws IOException {
if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) {
paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes());
paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage());
paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized);
// fileCacheFree will be less than zero if the cache being over-subscribed
long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized;
if (fileCacheFree > 0) {
paths[i].available -= fileCacheFree;
}
// If the reserved space for file caching is being used by other files, such as local indices or heap dumps.
if (paths[i].available < 0) {
paths[i].available = 0;
}
}
}
FsInfo.IoStats ioStats = null;
Expand Down Expand Up @@ -211,4 +219,11 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException {
return fsPath;
}

public static long getTotalSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace());
}

public static long getAvailableSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace());
}
}
82 changes: 52 additions & 30 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Build;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.unit.RatioValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -175,7 +177,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -371,9 +372,12 @@ public class Node implements Closeable {
}
}, Setting.Property.NodeScope);

public static final Setting<ByteSizeValue> NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting(
private static final String ZERO = "0";

public static final Setting<String> NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>(
"node.search.cache.size",
ByteSizeValue.ZERO,
s -> DiscoveryNode.isDedicatedSearchNode(s) ? "80%" : ZERO,
Node::validateFileCacheSize,
Property.NodeScope
);

Expand Down Expand Up @@ -1995,39 +1999,57 @@ DiscoveryNode getNode() {
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
* Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
if (DiscoveryNode.isSearchNode(settings)) {
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
if (DiscoveryNode.isSearchNode(settings) == false) {
return;
}

String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings);
if (capacityRaw.equals(ZERO)) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
}

NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath));
long capacity = calculateFileCacheSize(capacityRaw, totalSpace);
if (capacity <= 0 || totalSpace <= capacity) {
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
}

this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}

private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {
try {
RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw);
return Math.round(totalSpace * ratioValue.getAsRatio());
} catch (OpenSearchParseException e) {
try {
return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes();
} catch (OpenSearchParseException ex) {
ex.addSuppressed(e);
throw ex;
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

private static String validateFileCacheSize(String capacityRaw) {
calculateFileCacheSize(capacityRaw, 0L);
return capacityRaw;
}

/**
* Returns the {@link FileCache} instance for remote search node
* Note: Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void createNodePaths() throws IOException {
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
Settings defaultSearchSettings = Settings.builder()
.put(dataClusterManagerSettings)
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB))
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString())
.build();

searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);
Expand Down
2 changes: 1 addition & 1 deletion server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception {
List<Class<? extends Plugin>> plugins = basePlugins();
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
Settings onlySearchRoleSettings = Settings.builder()
.put(searchRoleSettingsWithConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.NodeRoles.removeRoles;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.node.settings()
);

private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB);
private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb";
private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%";

public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1;
public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3;
Expand Down Expand Up @@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
logger.info("increasing cluster size from {} to {}", size, n);
Set<DiscoveryNodeRole> searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE);
Settings settings = Settings.builder()
.put(Settings.EMPTY)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE)
.put(
Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(),
randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES
)
.build();
startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
validateClusterFormed();
Expand Down

0 comments on commit 328da32

Please sign in to comment.